You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/07/03 09:46:30 UTC
[flink] 02/03: [FLINK-12977][table] Port CsvTableSink to
api-java-bridge
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 981a54d6718753ca5b2cd68259b72b4ecea9008a
Author: Gen Luo <lu...@gmail.com>
AuthorDate: Wed Jun 26 11:46:53 2019 +0800
[FLINK-12977][table] Port CsvTableSink to api-java-bridge
---
flink-python/pyflink/table/sinks.py | 14 +-
.../org/apache/flink/table/sinks/CsvTableSink.java | 187 +++++++++++++++++++++
.../apache/flink/table/sinks/CsvTableSink.scala | 140 ---------------
.../runtime/batch/table/TableSinkITCase.scala | 2 +-
4 files changed, 193 insertions(+), 150 deletions(-)
diff --git a/flink-python/pyflink/table/sinks.py b/flink-python/pyflink/table/sinks.py
index 4aa968f..deb5b45 100644
--- a/flink-python/pyflink/table/sinks.py
+++ b/flink-python/pyflink/table/sinks.py
@@ -57,24 +57,20 @@ class CsvTableSink(TableSink):
and :data:`WriteMode.OVERWRITE`.
"""
- def __init__(self, field_names, field_types, path, field_delimiter=',', num_files=1,
+ def __init__(self, field_names, field_types, path, field_delimiter=',', num_files=-1,
write_mode=None):
# type: (list[str], list[DataType], str, str, int, int) -> None
gateway = get_gateway()
if write_mode == WriteMode.NO_OVERWRITE:
- j_write_mode = gateway.jvm.scala.Option.apply(
- gateway.jvm.org.apache.flink.core.fs.FileSystem.WriteMode.NO_OVERWRITE)
+ j_write_mode = gateway.jvm.org.apache.flink.core.fs.FileSystem.WriteMode.NO_OVERWRITE
elif write_mode == WriteMode.OVERWRITE:
- j_write_mode = gateway.jvm.scala.Option.apply(
- gateway.jvm.org.apache.flink.core.fs.FileSystem.WriteMode.OVERWRITE)
+ j_write_mode = gateway.jvm.org.apache.flink.core.fs.FileSystem.WriteMode.OVERWRITE
elif write_mode is None:
- j_write_mode = gateway.jvm.scala.Option.empty()
+ j_write_mode = None
else:
raise Exception('Unsupported write_mode: %s' % write_mode)
- j_some_field_delimiter = gateway.jvm.scala.Option.apply(field_delimiter)
- j_some_num_files = gateway.jvm.scala.Option.apply(num_files)
j_csv_table_sink = gateway.jvm.CsvTableSink(
- path, j_some_field_delimiter, j_some_num_files, j_write_mode)
+ path, field_delimiter, num_files, j_write_mode)
j_field_names = utils.to_jarray(gateway.jvm.String, field_names)
j_field_types = utils.to_jarray(gateway.jvm.TypeInformation,
[_to_java_type(field_type) for field_type in field_types])
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSink.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSink.java
new file mode 100644
index 0000000..f950f4d
--- /dev/null
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSink.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.operators.DataSink;
+import org.apache.flink.api.java.operators.MapOperator;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.table.utils.TableConnectorUtils;
+import org.apache.flink.types.Row;
+
+/**
+ * A simple {@link TableSink} to emit data as CSV files.
+ */
+public class CsvTableSink implements BatchTableSink<Row>, AppendStreamTableSink<Row> {
+ private String path;
+ private String fieldDelim;
+ private int numFiles = -1;
+ private FileSystem.WriteMode writeMode;
+
+ private String[] fieldNames;
+ private TypeInformation<?>[] fieldTypes;
+
+ /**
+ * A simple {@link TableSink} to emit data as CSV files.
+ *
+ * @param path The output path to write the Table to.
+ * @param fieldDelim The field delimiter
+ * @param numFiles The number of files to write to
+ * @param writeMode The write mode to specify whether existing files are overwritten or not.
+ */
+ public CsvTableSink(
+ String path,
+ String fieldDelim,
+ int numFiles,
+ FileSystem.WriteMode writeMode) {
+ this.path = path;
+ this.fieldDelim = fieldDelim;
+ this.numFiles = numFiles;
+ this.writeMode = writeMode;
+ }
+
+ /**
+ * A simple {@link TableSink} to emit data as CSV files using comma as field delimiter, with default
+ * parallelism and write mode.
+ *
+ * @param path The output path to write the Table to.
+ */
+ public CsvTableSink(String path) {
+ this(path, ",");
+ }
+
+ /**
+ * A simple {@link TableSink} to emit data as CSV files, with default parallelism and write mode.
+ *
+ * @param path The output path to write the Table to.
+ * @param fieldDelim The field delimiter
+ */
+ public CsvTableSink(String path, String fieldDelim) {
+ this(path, fieldDelim, -1, null);
+ }
+
+ @Override
+ public void emitDataSet(DataSet<Row> dataSet) {
+ MapOperator<Row, String> csvRows =
+ dataSet.map(new CsvFormatter(fieldDelim == null ? "," : fieldDelim));
+
+ DataSink<String> sink;
+ if (writeMode != null) {
+ sink = csvRows.writeAsText(path, writeMode);
+ } else {
+ sink = csvRows.writeAsText(path);
+ }
+
+ if (numFiles > 0) {
+ csvRows.setParallelism(numFiles);
+ sink.setParallelism(numFiles);
+ }
+
+ sink.name(TableConnectorUtils.generateRuntimeName(CsvTableSink.class, fieldNames));
+ }
+
+ @Override
+ public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
+ SingleOutputStreamOperator<String> csvRows =
+ dataStream.map(new CsvFormatter(fieldDelim == null ? "," : fieldDelim));
+
+ DataStreamSink<String> sink;
+ if (writeMode != null) {
+ sink = csvRows.writeAsText(path, writeMode);
+ } else {
+ sink = csvRows.writeAsText(path);
+ }
+
+ if (numFiles > 0) {
+ csvRows.setParallelism(numFiles);
+ sink.setParallelism(numFiles);
+ }
+
+ sink.name(TableConnectorUtils.generateRuntimeName(CsvTableSink.class, fieldNames));
+
+ return sink;
+ }
+
+ @Override
+ public void emitDataStream(DataStream<Row> dataStream) {
+ consumeDataStream(dataStream);
+ }
+
+ @Override
+ public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
+ CsvTableSink configuredSink = new CsvTableSink(path, fieldDelim, numFiles, writeMode);
+ configuredSink.fieldNames = fieldNames;
+ configuredSink.fieldTypes = fieldTypes;
+ return configuredSink;
+ }
+
+ @Override
+ public TypeInformation<Row> getOutputType() {
+ return new RowTypeInfo(getFieldTypes(), getFieldNames());
+ }
+
+ @Override
+ public String[] getFieldNames() {
+ return fieldNames;
+ }
+
+ @Override
+ public TypeInformation<?>[] getFieldTypes() {
+ return fieldTypes;
+ }
+
+ /**
+ * Formats a Row into a String with fields separated by the field delimiter.
+ */
+ public static class CsvFormatter implements MapFunction<Row, String> {
+ private static final long serialVersionUID = 1L;
+
+ private final String fieldDelim;
+
+ /**
+ * Constructor with field delimiter.
+ *
+ * @param fieldDelim The field delimiter.
+ */
+ CsvFormatter(String fieldDelim) {
+ this.fieldDelim = fieldDelim;
+ }
+
+ @Override
+ public String map(Row row) {
+ StringBuilder builder = new StringBuilder();
+ Object o;
+ for (int i = 0; i < row.getArity(); i++) {
+ if (builder.length() != 0) {
+ builder.append(fieldDelim);
+ }
+ if ((o = row.getField(i)) != null) {
+ builder.append(o);
+ }
+ }
+ return builder.toString();
+ }
+ }
+}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala
deleted file mode 100644
index 8037f4d..0000000
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sinks
-
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.DataSet
-import org.apache.flink.types.Row
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.table.utils.TableConnectorUtils
-
-/**
- * A simple [[TableSink]] to emit data as CSV files.
- *
- * @param path The output path to write the Table to.
- * @param fieldDelim The field delimiter
- * @param numFiles The number of files to write to
- * @param writeMode The write mode to specify whether existing files are overwritten or not.
- */
-class CsvTableSink(
- path: String,
- fieldDelim: Option[String],
- numFiles: Option[Int],
- writeMode: Option[WriteMode])
- extends TableSinkBase[Row] with BatchTableSink[Row] with AppendStreamTableSink[Row] {
-
- /**
- * A simple [[TableSink]] to emit data as CSV files.
- *
- * @param path The output path to write the Table to.
- * @param fieldDelim The field delimiter, ',' by default.
- */
- def this(path: String, fieldDelim: String = ",") {
- this(path, Some(fieldDelim), None, None)
- }
-
- /**
- * A simple [[TableSink]] to emit data as CSV files.
- *
- * @param path The output path to write the Table to.
- * @param fieldDelim The field delimiter.
- * @param numFiles The number of files to write to.
- * @param writeMode The write mode to specify whether existing files are overwritten or not.
- */
- def this(path: String, fieldDelim: String, numFiles: Int, writeMode: WriteMode) {
- this(path, Some(fieldDelim), Some(numFiles), Some(writeMode))
- }
-
- override def emitDataSet(dataSet: DataSet[Row]): Unit = {
- val csvRows = dataSet.map(new CsvFormatter(fieldDelim.getOrElse(",")))
-
- if (numFiles.isDefined) {
- csvRows.setParallelism(numFiles.get)
- }
-
- val sink = writeMode match {
- case None => csvRows.writeAsText(path)
- case Some(wm) => csvRows.writeAsText(path, wm)
- }
-
- if (numFiles.isDefined) {
- sink.setParallelism(numFiles.get)
- }
-
- sink.name(TableConnectorUtils.generateRuntimeName(this.getClass, getTableSchema.getFieldNames))
- }
-
- override def emitDataStream(dataStream: DataStream[Row]): Unit = {
- val csvRows = dataStream.map(new CsvFormatter(fieldDelim.getOrElse(",")))
-
- if (numFiles.isDefined) {
- csvRows.setParallelism(numFiles.get)
- }
-
- val sink = writeMode match {
- case None => csvRows.writeAsText(path)
- case Some(wm) => csvRows.writeAsText(path, wm)
- }
-
- if (numFiles.isDefined) {
- sink.setParallelism(numFiles.get)
- }
-
- sink.name(TableConnectorUtils.generateRuntimeName(this.getClass, getTableSchema.getFieldNames))
- }
-
- override protected def copy: TableSinkBase[Row] = {
- new CsvTableSink(path, fieldDelim, numFiles, writeMode)
- }
-
- override def getOutputType: TypeInformation[Row] = {
- new RowTypeInfo(getFieldTypes: _*)
- }
-}
-
-/**
- * Formats a [[Row]] into a [[String]] with fields separated by the field delimiter.
- *
- * @param fieldDelim The field delimiter.
- */
-class CsvFormatter(fieldDelim: String) extends MapFunction[Row, String] {
- override def map(row: Row): String = {
-
- val builder = new StringBuilder
-
- // write first value
- val v = row.getField(0)
- if (v != null) {
- builder.append(v.toString)
- }
-
- // write following values
- for (i <- 1 until row.getArity) {
- builder.append(fieldDelim)
- val v = row.getField(i)
- if (v != null) {
- builder.append(v.toString)
- }
- }
- builder.mkString
- }
-}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSinkITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSinkITCase.scala
index d64df26..1e88e01 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSinkITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSinkITCase.scala
@@ -54,7 +54,7 @@ class TableSinkITCase(
tEnv.registerTableSink(
"testSink",
- new CsvTableSink(path, fieldDelim = "|").configure(
+ new CsvTableSink(path, "|").configure(
Array[String]("c", "b"), Array[TypeInformation[_]](Types.STRING, Types.LONG)))
val input = CollectionDataSets.get3TupleDataSet(env)