You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/16 15:47:14 UTC

[45/47] flink git commit: [FLINK-5343] [table] Add support to overwrite files with CsvTableSink.

[FLINK-5343] [table] Add support to overwrite files with CsvTableSink.

This closes #3011.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cc34c145
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cc34c145
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cc34c145

Branch: refs/heads/master
Commit: cc34c1450486e5596f6af117e77cac6da93fad03
Parents: ffe9ec8
Author: Fabian Hueske <fh...@apache.org>
Authored: Thu Dec 15 11:01:39 2016 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Dec 16 16:41:21 2016 +0100

----------------------------------------------------------------------
 .../apache/flink/table/sinks/CsvTableSink.scala | 67 +++++++++++++++++---
 1 file changed, 58 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cc34c145/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala
index 9cf76dd..c37ee74 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala
@@ -23,33 +23,82 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
 import org.apache.flink.types.Row
 import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.core.fs.FileSystem.WriteMode
 import org.apache.flink.streaming.api.datastream.DataStream
 
 /**
   * A simple [[TableSink]] to emit data as CSV files.
   *
   * @param path The output path to write the Table to.
-  * @param fieldDelim The field delimiter, ',' by default.
+  * @param fieldDelim The field delimiter
+  * @param numFiles The number of files to write to
+  * @param writeMode The write mode to specify whether existing files are overwritten or not.
   */
 class CsvTableSink(
     path: String,
-    fieldDelim: String = ",")
+    fieldDelim: Option[String],
+    numFiles: Option[Int],
+    writeMode: Option[WriteMode])
   extends TableSinkBase[Row] with BatchTableSink[Row] with StreamTableSink[Row] {
 
+  /**
+    * A simple [[TableSink]] to emit data as CSV files.
+    *
+    * @param path The output path to write the Table to.
+    * @param fieldDelim The field delimiter, ',' by default.
+    */
+  def this(path: String, fieldDelim: String = ",") {
+    this(path, Some(fieldDelim), None, None)
+  }
+
+  /**
+    * A simple [[TableSink]] to emit data as CSV files.
+    *
+    * @param path The output path to write the Table to.
+    * @param fieldDelim The field delimiter.
+    * @param numFiles The number of files to write to.
+    * @param writeMode The write mode to specify whether existing files are overwritten or not.
+    */
+  def this(path: String, fieldDelim: String, numFiles: Int, writeMode: WriteMode) {
+    this(path, Some(fieldDelim), Some(numFiles), Some(writeMode))
+  }
+
   override def emitDataSet(dataSet: DataSet[Row]): Unit = {
-    dataSet
-      .map(new CsvFormatter(fieldDelim))
-      .writeAsText(path)
+    val csvRows = dataSet.map(new CsvFormatter(fieldDelim.getOrElse(",")))
+
+    if (numFiles.isDefined) {
+      csvRows.setParallelism(numFiles.get)
+    }
+
+    val sink = writeMode match {
+      case None => csvRows.writeAsText(path)
+      case Some(wm) => csvRows.writeAsText(path, wm)
+    }
+
+    if (numFiles.isDefined) {
+      sink.setParallelism(numFiles.get)
+    }
   }
 
   override def emitDataStream(dataStream: DataStream[Row]): Unit = {
-    dataStream
-      .map(new CsvFormatter(fieldDelim))
-      .writeAsText(path)
+    val csvRows = dataStream.map(new CsvFormatter(fieldDelim.getOrElse(",")))
+
+    if (numFiles.isDefined) {
+      csvRows.setParallelism(numFiles.get)
+    }
+
+    val sink = writeMode match {
+      case None => csvRows.writeAsText(path)
+      case Some(wm) => csvRows.writeAsText(path, wm)
+    }
+
+    if (numFiles.isDefined) {
+      sink.setParallelism(numFiles.get)
+    }
   }
 
   override protected def copy: TableSinkBase[Row] = {
-    new CsvTableSink(path, fieldDelim)
+    new CsvTableSink(path, fieldDelim, numFiles, writeMode)
   }
 
   override def getOutputType: TypeInformation[Row] = {