You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/16 15:47:14 UTC
[45/47] flink git commit: [FLINK-5343] [table] Add support to
overwrite files with CsvTableSink.
[FLINK-5343] [table] Add support to overwrite files with CsvTableSink.
This closes #3011.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cc34c145
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cc34c145
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cc34c145
Branch: refs/heads/master
Commit: cc34c1450486e5596f6af117e77cac6da93fad03
Parents: ffe9ec8
Author: Fabian Hueske <fh...@apache.org>
Authored: Thu Dec 15 11:01:39 2016 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Dec 16 16:41:21 2016 +0100
----------------------------------------------------------------------
.../apache/flink/table/sinks/CsvTableSink.scala | 67 +++++++++++++++++---
1 file changed, 58 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/cc34c145/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala
index 9cf76dd..c37ee74 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala
@@ -23,33 +23,82 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.DataSet
import org.apache.flink.types.Row
import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.streaming.api.datastream.DataStream
/**
* A simple [[TableSink]] to emit data as CSV files.
*
* @param path The output path to write the Table to.
- * @param fieldDelim The field delimiter, ',' by default.
+ * @param fieldDelim The field delimiter
+ * @param numFiles The number of files to write to
+ * @param writeMode The write mode to specify whether existing files are overwritten or not.
*/
class CsvTableSink(
path: String,
- fieldDelim: String = ",")
+ fieldDelim: Option[String],
+ numFiles: Option[Int],
+ writeMode: Option[WriteMode])
extends TableSinkBase[Row] with BatchTableSink[Row] with StreamTableSink[Row] {
+ /**
+ * A simple [[TableSink]] to emit data as CSV files.
+ *
+ * @param path The output path to write the Table to.
+ * @param fieldDelim The field delimiter, ',' by default.
+ */
+ def this(path: String, fieldDelim: String = ",") {
+ this(path, Some(fieldDelim), None, None)
+ }
+
+ /**
+ * A simple [[TableSink]] to emit data as CSV files.
+ *
+ * @param path The output path to write the Table to.
+ * @param fieldDelim The field delimiter.
+ * @param numFiles The number of files to write to.
+ * @param writeMode The write mode to specify whether existing files are overwritten or not.
+ */
+ def this(path: String, fieldDelim: String, numFiles: Int, writeMode: WriteMode) {
+ this(path, Some(fieldDelim), Some(numFiles), Some(writeMode))
+ }
+
override def emitDataSet(dataSet: DataSet[Row]): Unit = {
- dataSet
- .map(new CsvFormatter(fieldDelim))
- .writeAsText(path)
+ val csvRows = dataSet.map(new CsvFormatter(fieldDelim.getOrElse(",")))
+
+ if (numFiles.isDefined) {
+ csvRows.setParallelism(numFiles.get)
+ }
+
+ val sink = writeMode match {
+ case None => csvRows.writeAsText(path)
+ case Some(wm) => csvRows.writeAsText(path, wm)
+ }
+
+ if (numFiles.isDefined) {
+ sink.setParallelism(numFiles.get)
+ }
}
override def emitDataStream(dataStream: DataStream[Row]): Unit = {
- dataStream
- .map(new CsvFormatter(fieldDelim))
- .writeAsText(path)
+ val csvRows = dataStream.map(new CsvFormatter(fieldDelim.getOrElse(",")))
+
+ if (numFiles.isDefined) {
+ csvRows.setParallelism(numFiles.get)
+ }
+
+ val sink = writeMode match {
+ case None => csvRows.writeAsText(path)
+ case Some(wm) => csvRows.writeAsText(path, wm)
+ }
+
+ if (numFiles.isDefined) {
+ sink.setParallelism(numFiles.get)
+ }
}
override protected def copy: TableSinkBase[Row] = {
- new CsvTableSink(path, fieldDelim)
+ new CsvTableSink(path, fieldDelim, numFiles, writeMode)
}
override def getOutputType: TypeInformation[Row] = {