You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/07/03 09:46:30 UTC

[flink] 02/03: [FLINK-12977][table] Port CsvTableSink to api-java-bridge

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 981a54d6718753ca5b2cd68259b72b4ecea9008a
Author: Gen Luo <lu...@gmail.com>
AuthorDate: Wed Jun 26 11:46:53 2019 +0800

    [FLINK-12977][table] Port CsvTableSink to api-java-bridge
---
 flink-python/pyflink/table/sinks.py                |  14 +-
 .../org/apache/flink/table/sinks/CsvTableSink.java | 187 +++++++++++++++++++++
 .../apache/flink/table/sinks/CsvTableSink.scala    | 140 ---------------
 .../runtime/batch/table/TableSinkITCase.scala      |   2 +-
 4 files changed, 193 insertions(+), 150 deletions(-)

diff --git a/flink-python/pyflink/table/sinks.py b/flink-python/pyflink/table/sinks.py
index 4aa968f..deb5b45 100644
--- a/flink-python/pyflink/table/sinks.py
+++ b/flink-python/pyflink/table/sinks.py
@@ -57,24 +57,20 @@ class CsvTableSink(TableSink):
                        and :data:`WriteMode.OVERWRITE`.
     """
 
-    def __init__(self, field_names, field_types, path, field_delimiter=',', num_files=1,
+    def __init__(self, field_names, field_types, path, field_delimiter=',', num_files=-1,
                  write_mode=None):
         # type: (list[str], list[DataType], str, str, int, int) -> None
         gateway = get_gateway()
         if write_mode == WriteMode.NO_OVERWRITE:
-            j_write_mode = gateway.jvm.scala.Option.apply(
-                gateway.jvm.org.apache.flink.core.fs.FileSystem.WriteMode.NO_OVERWRITE)
+            j_write_mode = gateway.jvm.org.apache.flink.core.fs.FileSystem.WriteMode.NO_OVERWRITE
         elif write_mode == WriteMode.OVERWRITE:
-            j_write_mode = gateway.jvm.scala.Option.apply(
-                gateway.jvm.org.apache.flink.core.fs.FileSystem.WriteMode.OVERWRITE)
+            j_write_mode = gateway.jvm.org.apache.flink.core.fs.FileSystem.WriteMode.OVERWRITE
         elif write_mode is None:
-            j_write_mode = gateway.jvm.scala.Option.empty()
+            j_write_mode = None
         else:
             raise Exception('Unsupported write_mode: %s' % write_mode)
-        j_some_field_delimiter = gateway.jvm.scala.Option.apply(field_delimiter)
-        j_some_num_files = gateway.jvm.scala.Option.apply(num_files)
         j_csv_table_sink = gateway.jvm.CsvTableSink(
-            path, j_some_field_delimiter, j_some_num_files, j_write_mode)
+            path, field_delimiter, num_files, j_write_mode)
         j_field_names = utils.to_jarray(gateway.jvm.String, field_names)
         j_field_types = utils.to_jarray(gateway.jvm.TypeInformation,
                                         [_to_java_type(field_type) for field_type in field_types])
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSink.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSink.java
new file mode 100644
index 0000000..f950f4d
--- /dev/null
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSink.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.operators.DataSink;
+import org.apache.flink.api.java.operators.MapOperator;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.table.utils.TableConnectorUtils;
+import org.apache.flink.types.Row;
+
+/**
+ * A simple {@link TableSink} to emit data as CSV files.
+ */
+public class CsvTableSink implements BatchTableSink<Row>, AppendStreamTableSink<Row> {
+	private String path;
+	private String fieldDelim;
+	private int numFiles = -1;
+	private FileSystem.WriteMode writeMode;
+
+	private String[] fieldNames;
+	private TypeInformation<?>[] fieldTypes;
+
+	/**
+	 * A simple {@link TableSink} to emit data as CSV files.
+	 *
+	 * @param path       The output path to write the Table to.
+	 * @param fieldDelim The field delimiter
+	 * @param numFiles   The number of files to write to
+	 * @param writeMode  The write mode to specify whether existing files are overwritten or not.
+	 */
+	public CsvTableSink(
+		String path,
+		String fieldDelim,
+		int numFiles,
+		FileSystem.WriteMode writeMode) {
+		this.path = path;
+		this.fieldDelim = fieldDelim;
+		this.numFiles = numFiles;
+		this.writeMode = writeMode;
+	}
+
+	/**
+	 * A simple {@link TableSink} to emit data as CSV files using comma as field delimiter, with default
+	 * parallelism and write mode.
+	 *
+	 * @param path The output path to write the Table to.
+	 */
+	public CsvTableSink(String path) {
+		this(path, ",");
+	}
+
+	/**
+	 * A simple {@link TableSink} to emit data as CSV files, with default parallelism and write mode.
+	 *
+	 * @param path       The output path to write the Table to.
+	 * @param fieldDelim The field delimiter
+	 */
+	public CsvTableSink(String path, String fieldDelim) {
+		this(path, fieldDelim, -1, null);
+	}
+
+	@Override
+	public void emitDataSet(DataSet<Row> dataSet) {
+		MapOperator<Row, String> csvRows =
+			dataSet.map(new CsvFormatter(fieldDelim == null ? "," : fieldDelim));
+
+		DataSink<String> sink;
+		if (writeMode != null) {
+			sink = csvRows.writeAsText(path, writeMode);
+		} else {
+			sink = csvRows.writeAsText(path);
+		}
+
+		if (numFiles > 0) {
+			csvRows.setParallelism(numFiles);
+			sink.setParallelism(numFiles);
+		}
+
+		sink.name(TableConnectorUtils.generateRuntimeName(CsvTableSink.class, fieldNames));
+	}
+
+	@Override
+	public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
+		SingleOutputStreamOperator<String> csvRows =
+			dataStream.map(new CsvFormatter(fieldDelim == null ? "," : fieldDelim));
+
+		DataStreamSink<String> sink;
+		if (writeMode != null) {
+			sink = csvRows.writeAsText(path, writeMode);
+		} else {
+			sink = csvRows.writeAsText(path);
+		}
+
+		if (numFiles > 0) {
+			csvRows.setParallelism(numFiles);
+			sink.setParallelism(numFiles);
+		}
+
+		sink.name(TableConnectorUtils.generateRuntimeName(CsvTableSink.class, fieldNames));
+
+		return sink;
+	}
+
+	@Override
+	public void emitDataStream(DataStream<Row> dataStream) {
+		consumeDataStream(dataStream);
+	}
+
+	@Override
+	public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
+		CsvTableSink configuredSink = new CsvTableSink(path, fieldDelim, numFiles, writeMode);
+		configuredSink.fieldNames = fieldNames;
+		configuredSink.fieldTypes = fieldTypes;
+		return configuredSink;
+	}
+
+	@Override
+	public TypeInformation<Row> getOutputType() {
+		return new RowTypeInfo(getFieldTypes(), getFieldNames());
+	}
+
+	@Override
+	public String[] getFieldNames() {
+		return fieldNames;
+	}
+
+	@Override
+	public TypeInformation<?>[] getFieldTypes() {
+		return fieldTypes;
+	}
+
+	/**
+	 * Formats a Row into a String with fields separated by the field delimiter.
+	 */
+	public static class CsvFormatter implements MapFunction<Row, String> {
+		private static final long serialVersionUID = 1L;
+
+		private final String fieldDelim;
+
+		/**
+		 * Constructor with field delimiter.
+		 *
+		 * @param fieldDelim The field delimiter.
+		 */
+		CsvFormatter(String fieldDelim) {
+			this.fieldDelim = fieldDelim;
+		}
+
+		@Override
+		public String map(Row row) {
+			StringBuilder builder = new StringBuilder();
+			Object o;
+			for (int i = 0; i < row.getArity(); i++) {
+				if (builder.length() != 0) {
+					builder.append(fieldDelim);
+				}
+				if ((o = row.getField(i)) != null) {
+					builder.append(o);
+				}
+			}
+			return builder.toString();
+		}
+	}
+}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala
deleted file mode 100644
index 8037f4d..0000000
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sinks
-
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.DataSet
-import org.apache.flink.types.Row
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.table.utils.TableConnectorUtils
-
-/**
-  * A simple [[TableSink]] to emit data as CSV files.
-  *
-  * @param path The output path to write the Table to.
-  * @param fieldDelim The field delimiter
-  * @param numFiles The number of files to write to
-  * @param writeMode The write mode to specify whether existing files are overwritten or not.
-  */
-class CsvTableSink(
-    path: String,
-    fieldDelim: Option[String],
-    numFiles: Option[Int],
-    writeMode: Option[WriteMode])
-  extends TableSinkBase[Row] with BatchTableSink[Row] with AppendStreamTableSink[Row] {
-
-  /**
-    * A simple [[TableSink]] to emit data as CSV files.
-    *
-    * @param path The output path to write the Table to.
-    * @param fieldDelim The field delimiter, ',' by default.
-    */
-  def this(path: String, fieldDelim: String = ",") {
-    this(path, Some(fieldDelim), None, None)
-  }
-
-  /**
-    * A simple [[TableSink]] to emit data as CSV files.
-    *
-    * @param path The output path to write the Table to.
-    * @param fieldDelim The field delimiter.
-    * @param numFiles The number of files to write to.
-    * @param writeMode The write mode to specify whether existing files are overwritten or not.
-    */
-  def this(path: String, fieldDelim: String, numFiles: Int, writeMode: WriteMode) {
-    this(path, Some(fieldDelim), Some(numFiles), Some(writeMode))
-  }
-
-  override def emitDataSet(dataSet: DataSet[Row]): Unit = {
-    val csvRows = dataSet.map(new CsvFormatter(fieldDelim.getOrElse(",")))
-
-    if (numFiles.isDefined) {
-      csvRows.setParallelism(numFiles.get)
-    }
-
-    val sink = writeMode match {
-      case None => csvRows.writeAsText(path)
-      case Some(wm) => csvRows.writeAsText(path, wm)
-    }
-
-    if (numFiles.isDefined) {
-      sink.setParallelism(numFiles.get)
-    }
-
-    sink.name(TableConnectorUtils.generateRuntimeName(this.getClass, getTableSchema.getFieldNames))
-  }
-
-  override def emitDataStream(dataStream: DataStream[Row]): Unit = {
-    val csvRows = dataStream.map(new CsvFormatter(fieldDelim.getOrElse(",")))
-
-    if (numFiles.isDefined) {
-      csvRows.setParallelism(numFiles.get)
-    }
-
-    val sink = writeMode match {
-      case None => csvRows.writeAsText(path)
-      case Some(wm) => csvRows.writeAsText(path, wm)
-    }
-
-    if (numFiles.isDefined) {
-      sink.setParallelism(numFiles.get)
-    }
-
-    sink.name(TableConnectorUtils.generateRuntimeName(this.getClass, getTableSchema.getFieldNames))
-  }
-
-  override protected def copy: TableSinkBase[Row] = {
-    new CsvTableSink(path, fieldDelim, numFiles, writeMode)
-  }
-
-  override def getOutputType: TypeInformation[Row] = {
-    new RowTypeInfo(getFieldTypes: _*)
-  }
-}
-
-/**
-  * Formats a [[Row]] into a [[String]] with fields separated by the field delimiter.
-  *
-  * @param fieldDelim The field delimiter.
-  */
-class CsvFormatter(fieldDelim: String) extends MapFunction[Row, String] {
-  override def map(row: Row): String = {
-
-    val builder = new StringBuilder
-
-    // write first value
-    val v = row.getField(0)
-    if (v != null) {
-      builder.append(v.toString)
-    }
-
-    // write following values
-    for (i <- 1 until row.getArity) {
-      builder.append(fieldDelim)
-      val v = row.getField(i)
-      if (v != null) {
-        builder.append(v.toString)
-      }
-    }
-    builder.mkString
-  }
-}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSinkITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSinkITCase.scala
index d64df26..1e88e01 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSinkITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSinkITCase.scala
@@ -54,7 +54,7 @@ class TableSinkITCase(
 
     tEnv.registerTableSink(
       "testSink",
-      new CsvTableSink(path, fieldDelim = "|").configure(
+      new CsvTableSink(path, "|").configure(
         Array[String]("c", "b"), Array[TypeInformation[_]](Types.STRING, Types.LONG)))
 
     val input = CollectionDataSets.get3TupleDataSet(env)