You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/07/03 09:46:28 UTC

[flink] branch master updated (1123ded -> c5ccbfe)

This is an automated email from the ASF dual-hosted git repository.

jark pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 1123ded  [FLINK-13070][table-planner-blink] Remove TableImpl from blink planner and use api.internal.TableImpl instead
     new 635cd55  [FLINK-12977][table] Port CsvTableSource to api-java-bridge
     new 981a54d  [FLINK-12977][table] Port CsvTableSink to api-java-bridge
     new c5ccbfe  [FLINK-12978][table] Support LookupableTableSource for CsvTableSource

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 flink-python/pyflink/table/sinks.py                |  14 +-
 .../org/apache/flink/table/sinks/CsvTableSink.java | 187 +++++++
 .../apache/flink/table/sources/CsvTableSource.java | 552 +++++++++++++++++++++
 .../flink/table/api/StreamTableEnvironment.scala   |   2 +-
 .../runtime/batch/sql/TableSourceITCase.scala      |  53 +-
 .../runtime/stream/sql/TableSourceITCase.scala     |  54 +-
 .../apache/flink/table/util/testTableSources.scala |  90 ++++
 .../apache/flink/table/sinks/CsvTableSink.scala    | 140 ------
 .../flink/table/sources/CsvTableSource.scala       | 364 --------------
 .../runtime/batch/table/TableSinkITCase.scala      |   2 +-
 .../runtime/stream/sql/TableSourceITCase.scala     |   2 +-
 .../flink/table/runtime/utils/CommonTestData.scala |  25 +-
 12 files changed, 951 insertions(+), 534 deletions(-)
 create mode 100644 flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSink.java
 create mode 100644 flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSource.java
 delete mode 100644 flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala
 delete mode 100644 flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala


[flink] 01/03: [FLINK-12977][table] Port CsvTableSource to api-java-bridge

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 635cd554e94e2c87858785f7e1227aa804eedc5e
Author: Gen Luo <lu...@gmail.com>
AuthorDate: Tue Jun 25 19:40:48 2019 +0800

    [FLINK-12977][table] Port CsvTableSource to api-java-bridge
    
    This closes #8872
---
 .../apache/flink/table/sources/CsvTableSource.java | 431 +++++++++++++++++++++
 .../flink/table/api/StreamTableEnvironment.scala   |   2 +-
 .../runtime/batch/sql/TableSourceITCase.scala      |  22 +-
 .../runtime/stream/sql/TableSourceITCase.scala     |  24 +-
 .../apache/flink/table/util/testTableSources.scala |  45 +++
 .../flink/table/sources/CsvTableSource.scala       | 364 -----------------
 .../runtime/stream/sql/TableSourceITCase.scala     |   2 +-
 .../flink/table/runtime/utils/CommonTestData.scala |  25 +-
 8 files changed, 532 insertions(+), 383 deletions(-)

diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSource.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSource.java
new file mode 100644
index 0000000..160bc9a
--- /dev/null
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSource.java
@@ -0,0 +1,431 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.CsvInputFormat;
+import org.apache.flink.api.java.io.RowCsvInputFormat;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.Objects;
+import java.util.stream.IntStream;
+
+/**
+ * A {@link StreamTableSource} and {@link BatchTableSource} for simple CSV files with a
+ * (logically) unlimited number of fields.
+ */
+public class CsvTableSource
+	implements StreamTableSource<Row>, BatchTableSource<Row>, ProjectableTableSource<Row> {
+
+	private final CsvInputFormatConfig config;
+
+	/**
+	 * A {@link InputFormatTableSource} and {@link LookupableTableSource} for simple CSV files with
+	 * a (logically) unlimited number of fields.
+	 *
+	 * @param path       The path to the CSV file.
+	 * @param fieldNames The names of the table fields.
+	 * @param fieldTypes The types of the table fields.
+	 */
+	public CsvTableSource(String path, String[] fieldNames, TypeInformation<?>[] fieldTypes) {
+		this(path, fieldNames, fieldTypes,
+			IntStream.range(0, fieldNames.length).toArray(),
+			CsvInputFormat.DEFAULT_FIELD_DELIMITER, CsvInputFormat.DEFAULT_LINE_DELIMITER,
+			null, false, null, false);
+	}
+
+	/**
+	 * A {@link InputFormatTableSource} and {@link LookupableTableSource} for simple CSV files with
+	 * a (logically) unlimited number of fields.
+	 *
+	 * @param path            The path to the CSV file.
+	 * @param fieldNames      The names of the table fields.
+	 * @param fieldTypes      The types of the table fields.
+	 * @param fieldDelim      The field delimiter, "," by default.
+	 * @param lineDelim       The row delimiter, "\n" by default.
+	 * @param quoteCharacter  An optional quote character for String values, null by default.
+	 * @param ignoreFirstLine Flag to ignore the first line, false by default.
+	 * @param ignoreComments  An optional prefix to indicate comments, null by default.
+	 * @param lenient         Flag to skip records with parse error instead to fail, false by
+	 *                        default.
+	 */
+	public CsvTableSource(
+		String path,
+		String[] fieldNames,
+		TypeInformation<?>[] fieldTypes,
+		String fieldDelim,
+		String lineDelim,
+		Character quoteCharacter,
+		boolean ignoreFirstLine,
+		String ignoreComments,
+		boolean lenient) {
+
+		this(path, fieldNames, fieldTypes,
+			IntStream.range(0, fieldNames.length).toArray(),
+			fieldDelim, lineDelim,
+			quoteCharacter, ignoreFirstLine, ignoreComments, lenient);
+	}
+
+	/**
+	 * A {@link InputFormatTableSource} and {@link LookupableTableSource} for simple CSV files with
+	 * a (logically) unlimited number of fields.
+	 *
+	 * @param path            The path to the CSV file.
+	 * @param fieldNames      The names of the table fields.
+	 * @param fieldTypes      The types of the table fields.
+	 * @param selectedFields  The fields which will be read and returned by the table source. If
+	 *                        None, all fields are returned.
+	 * @param fieldDelim      The field delimiter, "," by default.
+	 * @param lineDelim       The row delimiter, "\n" by default.
+	 * @param quoteCharacter  An optional quote character for String values, null by default.
+	 * @param ignoreFirstLine Flag to ignore the first line, false by default.
+	 * @param ignoreComments  An optional prefix to indicate comments, null by default.
+	 * @param lenient         Flag to skip records with parse error instead to fail, false by
+	 *                        default.
+	 */
+	public CsvTableSource(
+		String path,
+		String[] fieldNames,
+		TypeInformation<?>[] fieldTypes,
+		int[] selectedFields,
+		String fieldDelim,
+		String lineDelim,
+		Character quoteCharacter,
+		boolean ignoreFirstLine,
+		String ignoreComments,
+		boolean lenient) {
+		this(new CsvInputFormatConfig(path, fieldNames, fieldTypes, selectedFields,
+			fieldDelim, lineDelim, quoteCharacter, ignoreFirstLine, ignoreComments, lenient));
+	}
+
+	private CsvTableSource(CsvInputFormatConfig config) {
+		this.config = config;
+	}
+
+	/**
+	 * Return a new builder that builds a CsvTableSource. For example:
+	 * <pre>
+	 * CsvTableSource source = new CsvTableSource.builder()
+	 *     .path("/path/to/your/file.csv")
+	 *     .field("myfield", Types.STRING)
+	 *     .field("myfield2", Types.INT)
+	 *     .build();
+	 * </pre>
+	 *
+	 * @return a new builder to build a CsvTableSource
+	 */
+	public static Builder builder() {
+		return new Builder();
+	}
+
+	@Override
+	public TypeInformation<Row> getReturnType() {
+		return new RowTypeInfo(config.getSelectedFieldTypes(), config.getSelectedFieldNames());
+	}
+
+	@Override
+	public TableSchema getTableSchema() {
+		return new TableSchema(config.fieldNames, config.fieldTypes);
+	}
+
+	@Override
+	public CsvTableSource projectFields(int[] fields) {
+		if (fields.length == 0) {
+			fields = new int[]{0};
+		}
+		return new CsvTableSource(config.select(fields));
+	}
+
+	@Override
+	public boolean isBounded() {
+		return true;
+	}
+
+	@Override
+	public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
+		return execEnv.createInput(config.createInputFormat(), getReturnType()).name(explainSource());
+	}
+
+	@Override
+	public DataSet<Row> getDataSet(ExecutionEnvironment execEnv) {
+		return execEnv.createInput(config.createInputFormat(), getReturnType()).name(explainSource());
+	}
+
+	@Override
+	public String explainSource() {
+		String[] fields = config.getSelectedFieldNames();
+		return "CsvTableSource(read fields: " + String.join(", ", fields) + ")";
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		CsvTableSource that = (CsvTableSource) o;
+		return Objects.equals(config, that.config);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(config);
+	}
+
+	/**
+	 * A builder for creating CsvTableSource instances.
+	 */
+	public static class Builder {
+		private LinkedHashMap<String, TypeInformation<?>> schema = new LinkedHashMap<>();
+		private Character quoteCharacter;
+		private String path;
+		private String fieldDelim = CsvInputFormat.DEFAULT_FIELD_DELIMITER;
+		private String lineDelim = CsvInputFormat.DEFAULT_LINE_DELIMITER;
+		private boolean isIgnoreFirstLine = false;
+		private String commentPrefix;
+		private boolean lenient = false;
+
+		/**
+		 * Sets the path to the CSV file. Required.
+		 *
+		 * @param path the path to the CSV file
+		 */
+		public Builder path(String path) {
+			this.path = path;
+			return this;
+		}
+
+		/**
+		 * Sets the field delimiter, "," by default.
+		 *
+		 * @param delim the field delimiter
+		 */
+		public Builder fieldDelimiter(String delim) {
+			this.fieldDelim = delim;
+			return this;
+		}
+
+		/**
+		 * Sets the line delimiter, "\n" by default.
+		 *
+		 * @param delim the line delimiter
+		 */
+		public Builder lineDelimiter(String delim) {
+			this.lineDelim = delim;
+			return this;
+		}
+
+		/**
+		 * Adds a field with the field name and the type information. Required. This method can be
+		 * called multiple times. The call order of this method defines also the order of the fields
+		 * in a row.
+		 *
+		 * @param fieldName the field name
+		 * @param fieldType the type information of the field
+		 */
+		public Builder field(String fieldName, TypeInformation<?> fieldType) {
+			if (schema.containsKey(fieldName)) {
+				throw new IllegalArgumentException("Duplicate field name " + fieldName);
+			}
+			schema.put(fieldName, fieldType);
+			return this;
+		}
+
+		/**
+		 * Sets a quote character for String values, null by default.
+		 *
+		 * @param quote the quote character
+		 */
+		public Builder quoteCharacter(Character quote) {
+			this.quoteCharacter = quote;
+			return this;
+		}
+
+		/**
+		 * Sets a prefix to indicate comments, null by default.
+		 *
+		 * @param prefix the prefix to indicate comments
+		 */
+		public Builder commentPrefix(String prefix) {
+			this.commentPrefix = prefix;
+			return this;
+		}
+
+		/**
+		 * Ignore the first line. Not skip the first line by default.
+		 */
+		public Builder ignoreFirstLine() {
+			this.isIgnoreFirstLine = true;
+			return this;
+		}
+
+		/**
+		 * Skip records with parse error instead to fail. Throw an exception by default.
+		 */
+		public Builder ignoreParseErrors() {
+			this.lenient = true;
+			return this;
+		}
+
+		/**
+		 * Apply the current values and constructs a newly-created CsvTableSource.
+		 *
+		 * @return a newly-created CsvTableSource
+		 */
+		public CsvTableSource build() {
+			if (path == null) {
+				throw new IllegalArgumentException("Path must be defined.");
+			}
+			if (schema.isEmpty()) {
+				throw new IllegalArgumentException("Fields can not be empty.");
+			}
+			return new CsvTableSource(
+				path,
+				schema.keySet().toArray(new String[0]),
+				schema.values().toArray(new TypeInformation<?>[0]),
+				fieldDelim,
+				lineDelim,
+				quoteCharacter,
+				isIgnoreFirstLine,
+				commentPrefix,
+				lenient);
+		}
+
+	}
+
+	private static class CsvInputFormatConfig implements Serializable {
+		private static final long serialVersionUID = 1L;
+
+		private final String path;
+		private final String[] fieldNames;
+		private final TypeInformation<?>[] fieldTypes;
+		private final int[] selectedFields;
+
+		private final String fieldDelim;
+		private final String lineDelim;
+		private final Character quoteCharacter;
+		private final boolean ignoreFirstLine;
+		private final String ignoreComments;
+		private final boolean lenient;
+
+		CsvInputFormatConfig(
+			String path,
+			String[] fieldNames,
+			TypeInformation<?>[] fieldTypes,
+			int[] selectedFields,
+			String fieldDelim,
+			String lineDelim,
+			Character quoteCharacter,
+			boolean ignoreFirstLine,
+			String ignoreComments,
+			boolean lenient) {
+
+			this.path = path;
+			this.fieldNames = fieldNames;
+			this.fieldTypes = fieldTypes;
+			this.selectedFields = selectedFields;
+			this.fieldDelim = fieldDelim;
+			this.lineDelim = lineDelim;
+			this.quoteCharacter = quoteCharacter;
+			this.ignoreFirstLine = ignoreFirstLine;
+			this.ignoreComments = ignoreComments;
+			this.lenient = lenient;
+		}
+
+		String[] getSelectedFieldNames() {
+			String[] selectedFieldNames = new String[selectedFields.length];
+			for (int i = 0; i < selectedFields.length; i++) {
+				selectedFieldNames[i] = fieldNames[selectedFields[i]];
+			}
+			return selectedFieldNames;
+		}
+
+		TypeInformation<?>[] getSelectedFieldTypes() {
+			TypeInformation<?>[] selectedFieldTypes = new TypeInformation<?>[selectedFields.length];
+			for (int i = 0; i < selectedFields.length; i++) {
+				selectedFieldTypes[i] = fieldTypes[selectedFields[i]];
+			}
+			return selectedFieldTypes;
+		}
+
+		RowCsvInputFormat createInputFormat() {
+			RowCsvInputFormat inputFormat = new RowCsvInputFormat(
+				new Path(path),
+				getSelectedFieldTypes(),
+				lineDelim,
+				fieldDelim,
+				selectedFields);
+			inputFormat.setSkipFirstLineAsHeader(ignoreFirstLine);
+			inputFormat.setCommentPrefix(ignoreComments);
+			inputFormat.setLenient(lenient);
+			if (quoteCharacter != null) {
+				inputFormat.enableQuotedStringParsing(quoteCharacter);
+			}
+			return inputFormat;
+		}
+
+		CsvInputFormatConfig select(int[] fields) {
+			return new CsvInputFormatConfig(path, fieldNames, fieldTypes, fields,
+				fieldDelim, lineDelim, quoteCharacter, ignoreFirstLine, ignoreComments, lenient);
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			CsvInputFormatConfig that = (CsvInputFormatConfig) o;
+			return ignoreFirstLine == that.ignoreFirstLine &&
+				lenient == that.lenient &&
+				Objects.equals(path, that.path) &&
+				Arrays.equals(fieldNames, that.fieldNames) &&
+				Arrays.equals(fieldTypes, that.fieldTypes) &&
+				Arrays.equals(selectedFields, that.selectedFields) &&
+				Objects.equals(fieldDelim, that.fieldDelim) &&
+				Objects.equals(lineDelim, that.lineDelim) &&
+				Objects.equals(quoteCharacter, that.quoteCharacter) &&
+				Objects.equals(ignoreComments, that.ignoreComments);
+		}
+
+		@Override
+		public int hashCode() {
+			int result = Objects.hash(path, fieldDelim, lineDelim, quoteCharacter, ignoreFirstLine,
+				ignoreComments, lenient);
+			result = 31 * result + Arrays.hashCode(fieldNames);
+			result = 31 * result + Arrays.hashCode(fieldTypes);
+			result = 31 * result + Arrays.hashCode(selectedFields);
+			return result;
+		}
+	}
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index 3a5edd3..e4bcc8c 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -116,7 +116,7 @@ abstract class StreamTableEnvironment(
     // TODO TableSourceUtil.validateTableSource(tableSource)
     tableSource match {
       // check for proper stream table source
-      case streamTableSource: StreamTableSource[_] if !streamTableSource.isBounded => // ok
+      case _: StreamTableSource[_] => // StreamEnv can handle both bounded and unbounded ok
       // TODO `TableSourceUtil.hasRowtimeAttribute` depends on [Expression]
       // check that event-time is enabled if table source includes rowtime attributes
       // if (TableSourceUtil.hasRowtimeAttribute(streamTableSource) &&
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableSourceITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableSourceITCase.scala
index 39c59ed..4abf691 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableSourceITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableSourceITCase.scala
@@ -24,9 +24,8 @@ import org.apache.flink.table.api.{DataTypes, TableConfigOptions, TableSchema, T
 import org.apache.flink.table.runtime.utils.BatchTestBase.row
 import org.apache.flink.table.runtime.utils.{BatchTestBase, TestData}
 import org.apache.flink.table.types.TypeInfoDataTypeConverter
-import org.apache.flink.table.util.{TestFilterableTableSource, TestNestedProjectableTableSource, TestProjectableTableSource}
+import org.apache.flink.table.util.{TestFilterableTableSource, TestNestedProjectableTableSource, TestProjectableTableSource, TestTableSources}
 import org.apache.flink.types.Row
-
 import org.junit.{Before, Test}
 
 import java.lang.{Boolean => JBool, Integer => JInt, Long => JLong}
@@ -150,4 +149,23 @@ class TableSourceITCase extends BatchTestBase {
         row(8, "Record_8"))
     )
   }
+
+  @Test
+  def testCsvTableSource(): Unit = {
+    val csvTable = TestTableSources.getPersonCsvTableSource
+    tEnv.registerTableSource("csvTable", csvTable)
+    checkResult(
+      "SELECT id, `first`, `last`, score FROM csvTable",
+      Seq(
+        row(1, "Mike", "Smith", 12.3),
+        row(2, "Bob", "Taylor", 45.6),
+        row(3, "Sam", "Miller", 7.89),
+        row(4, "Peter", "Smith", 0.12),
+        row(5, "Liz", "Williams", 34.5),
+        row(6, "Sally", "Miller", 6.78),
+        row(7, "Alice", "Smith", 90.1),
+        row(8, "Kelly", "Williams", 2.34)
+      )
+    )
+  }
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala
index b1a2fc4..9f4c2c3 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala
@@ -26,12 +26,13 @@ import org.apache.flink.table.api.{TableSchema, Types}
 import org.apache.flink.table.runtime.utils.{StreamingTestBase, TestingAppendSink}
 import org.apache.flink.table.util._
 import org.apache.flink.types.Row
-
 import org.junit.Assert._
 import org.junit.Test
 
 import java.lang.{Boolean => JBool, Integer => JInt, Long => JLong}
 
+import scala.collection.mutable
+
 class TableSourceITCase extends StreamingTestBase {
 
   @Test
@@ -314,4 +315,25 @@ class TableSourceITCase extends StreamingTestBase {
     val expected = Seq("5,Record_5", "6,Record_6", "7,Record_7", "8,Record_8")
     assertEquals(expected.sorted, sink.getAppendResults.sorted)
   }
+
+  @Test
+  def testCsvTableSource(): Unit = {
+    val csvTable = TestTableSources.getPersonCsvTableSource
+    tEnv.registerTableSource("persons", csvTable)
+
+    val sink = new TestingAppendSink()
+    tEnv.sqlQuery(
+      "SELECT id, `first`, `last`, score FROM persons WHERE id < 4 ")
+      .toAppendStream[Row]
+      .addSink(sink)
+
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "1,Mike,Smith,12.3",
+      "2,Bob,Taylor,45.6",
+      "3,Sam,Miller,7.89")
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala
index 4c407f8..776d2c4 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala
@@ -33,6 +33,7 @@ import org.apache.flink.table.sources.tsextractors.ExistingField
 import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, PreserveWatermarks}
 import org.apache.flink.types.Row
 
+import java.io.{File, FileOutputStream, OutputStreamWriter}
 import java.util
 import java.util.{Collections, List => JList}
 
@@ -40,6 +41,50 @@ import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 
+object TestTableSources {
+
+  def getPersonCsvTableSource: CsvTableSource = {
+    val csvRecords = Seq(
+      "First#Id#Score#Last",
+      "Mike#1#12.3#Smith",
+      "Bob#2#45.6#Taylor",
+      "Sam#3#7.89#Miller",
+      "Peter#4#0.12#Smith",
+      "% Just a comment",
+      "Liz#5#34.5#Williams",
+      "Sally#6#6.78#Miller",
+      "Alice#7#90.1#Smith",
+      "Kelly#8#2.34#Williams"
+    )
+
+    val tempFilePath = writeToTempFile(csvRecords.mkString("$"), "csv-test", "tmp")
+    CsvTableSource.builder()
+      .path(tempFilePath)
+      .field("first", Types.STRING)
+      .field("id", Types.INT)
+      .field("score",Types.DOUBLE)
+      .field("last",Types.STRING)
+      .fieldDelimiter("#")
+      .lineDelimiter("$")
+      .ignoreFirstLine()
+      .commentPrefix("%")
+      .build()
+  }
+
+  private def writeToTempFile(
+      contents: String,
+      filePrefix: String,
+      fileSuffix: String,
+      charset: String = "UTF-8"): String = {
+    val tempFile = File.createTempFile(filePrefix, fileSuffix)
+    tempFile.deleteOnExit()
+    val tmpWriter = new OutputStreamWriter(new FileOutputStream(tempFile), charset)
+    tmpWriter.write(contents)
+    tmpWriter.close()
+    tempFile.getAbsolutePath
+  }
+}
+
 class TestTableSourceWithTime[T](
     isBatch: Boolean,
     tableSchema: TableSchema,
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
deleted file mode 100644
index f7215ee..0000000
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
+++ /dev/null
@@ -1,364 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sources
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.io.CsvInputFormat
-import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
-import org.apache.flink.types.Row
-import org.apache.flink.api.java.io.RowCsvInputFormat
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.core.fs.Path
-import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
-import org.apache.flink.table.api.{TableException, TableSchema}
-
-import scala.collection.mutable
-
-/**
-  * A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV files with a
-  * (logically) unlimited number of fields.
-  *
-  * @param path The path to the CSV file.
-  * @param fieldNames The names of the table fields.
-  * @param fieldTypes The types of the table fields.
-  * @param selectedFields The fields which will be read and returned by the table source.
-  *                       If None, all fields are returned.
-  * @param fieldDelim The field delimiter, "," by default.
-  * @param rowDelim The row delimiter, "\n" by default.
-  * @param quoteCharacter An optional quote character for String values, null by default.
-  * @param ignoreFirstLine Flag to ignore the first line, false by default.
-  * @param ignoreComments An optional prefix to indicate comments, null by default.
-  * @param lenient Flag to skip records with parse error instead to fail, false by default.
-  */
-class CsvTableSource private (
-    private val path: String,
-    private val fieldNames: Array[String],
-    private val fieldTypes: Array[TypeInformation[_]],
-    private val selectedFields: Array[Int],
-    private val fieldDelim: String,
-    private val rowDelim: String,
-    private val quoteCharacter: Character,
-    private val ignoreFirstLine: Boolean,
-    private val ignoreComments: String,
-    private val lenient: Boolean)
-  extends BatchTableSource[Row]
-  with StreamTableSource[Row]
-  with ProjectableTableSource[Row] {
-
-  /**
-    * A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV files with a
-    * (logically) unlimited number of fields.
-    *
-    * @param path The path to the CSV file.
-    * @param fieldNames The names of the table fields.
-    * @param fieldTypes The types of the table fields.
-    * @param fieldDelim The field delimiter, "," by default.
-    * @param rowDelim The row delimiter, "\n" by default.
-    * @param quoteCharacter An optional quote character for String values, null by default.
-    * @param ignoreFirstLine Flag to ignore the first line, false by default.
-    * @param ignoreComments An optional prefix to indicate comments, null by default.
-    * @param lenient Flag to skip records with parse error instead to fail, false by default.
-    */
-  def this(
-    path: String,
-    fieldNames: Array[String],
-    fieldTypes: Array[TypeInformation[_]],
-    fieldDelim: String = CsvInputFormat.DEFAULT_FIELD_DELIMITER,
-    rowDelim: String = CsvInputFormat.DEFAULT_LINE_DELIMITER,
-    quoteCharacter: Character = null,
-    ignoreFirstLine: Boolean = false,
-    ignoreComments: String = null,
-    lenient: Boolean = false) = {
-
-    this(
-      path,
-      fieldNames,
-      fieldTypes,
-      fieldTypes.indices.toArray, // initially, all fields are returned
-      fieldDelim,
-      rowDelim,
-      quoteCharacter,
-      ignoreFirstLine,
-      ignoreComments,
-      lenient)
-
-  }
-
-  /**
-    * A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV files with a
-    * (logically) unlimited number of fields.
-    *
-    * @param path The path to the CSV file.
-    * @param fieldNames The names of the table fields.
-    * @param fieldTypes The types of the table fields.
-    */
-  def this(path: String, fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]) = {
-    this(path, fieldNames, fieldTypes, CsvInputFormat.DEFAULT_FIELD_DELIMITER,
-      CsvInputFormat.DEFAULT_LINE_DELIMITER, null, false, null, false)
-  }
-
-  if (fieldNames.length != fieldTypes.length) {
-    throw new TableException("Number of field names and field types must be equal.")
-  }
-
-  private val selectedFieldTypes = selectedFields.map(fieldTypes(_))
-  private val selectedFieldNames = selectedFields.map(fieldNames(_))
-
-  private val returnType: RowTypeInfo = new RowTypeInfo(selectedFieldTypes, selectedFieldNames)
-
-  /**
-    * Returns the data of the table as a [[DataSet]] of [[Row]].
-    *
-    * NOTE: This method is for internal use only for defining a [[TableSource]].
-    *       Do not use it in Table API programs.
-    */
-  override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = {
-    execEnv.createInput(createCsvInput(), returnType).name(explainSource())
-  }
-
-  /** Returns the [[RowTypeInfo]] for the return type of the [[CsvTableSource]]. */
-  override def getReturnType: RowTypeInfo = returnType
-
-  /**
-    * Returns the data of the table as a [[DataStream]] of [[Row]].
-    *
-    * NOTE: This method is for internal use only for defining a [[TableSource]].
-    *       Do not use it in Table API programs.
-    */
-  override def getDataStream(streamExecEnv: StreamExecutionEnvironment): DataStream[Row] = {
-    streamExecEnv.createInput(createCsvInput(), returnType).name(explainSource())
-  }
-
-  /** Returns the schema of the produced table. */
-  override def getTableSchema = new TableSchema(fieldNames, fieldTypes)
-
-  /** Returns a copy of [[TableSource]] with ability to project fields */
-  override def projectFields(fields: Array[Int]): CsvTableSource = {
-
-    val selectedFields = if (fields.isEmpty) Array(0) else fields
-
-    new CsvTableSource(
-      path,
-      fieldNames,
-      fieldTypes,
-      selectedFields,
-      fieldDelim,
-      rowDelim,
-      quoteCharacter,
-      ignoreFirstLine,
-      ignoreComments,
-      lenient)
-  }
-
-  private def createCsvInput(): RowCsvInputFormat = {
-    val inputFormat = new RowCsvInputFormat(
-      new Path(path),
-      selectedFieldTypes,
-      rowDelim,
-      fieldDelim,
-      selectedFields)
-
-    inputFormat.setSkipFirstLineAsHeader(ignoreFirstLine)
-    inputFormat.setLenient(lenient)
-    if (quoteCharacter != null) {
-      inputFormat.enableQuotedStringParsing(quoteCharacter)
-    }
-    if (ignoreComments != null) {
-      inputFormat.setCommentPrefix(ignoreComments)
-    }
-
-    inputFormat
-  }
-
-  override def equals(other: Any): Boolean = other match {
-    case that: CsvTableSource => returnType == that.returnType &&
-        path == that.path &&
-        fieldDelim == that.fieldDelim &&
-        rowDelim == that.rowDelim &&
-        quoteCharacter == that.quoteCharacter &&
-        ignoreFirstLine == that.ignoreFirstLine &&
-        ignoreComments == that.ignoreComments &&
-        lenient == that.lenient
-    case _ => false
-  }
-
-  override def hashCode(): Int = {
-    returnType.hashCode()
-  }
-
-  override def explainSource(): String = {
-    s"CsvTableSource(" +
-      s"read fields: ${getReturnType.getFieldNames.mkString(", ")})"
-  }
-}
-
-object CsvTableSource {
-
-  /**
-    * A builder for creating [[CsvTableSource]] instances.
-    *
-    * For example:
-    *
-    * {{{
-    *   val source: CsvTableSource = new CsvTableSource.builder()
-    *     .path("/path/to/your/file.csv")
-    *     .field("myfield", Types.STRING)
-    *     .field("myfield2", Types.INT)
-    *     .build()
-    * }}}
-    *
-    */
-  class Builder {
-
-    private val schema: mutable.LinkedHashMap[String, TypeInformation[_]] =
-      mutable.LinkedHashMap[String, TypeInformation[_]]()
-    private var quoteCharacter: Character = _
-    private var path: String = _
-    private var fieldDelim: String = CsvInputFormat.DEFAULT_FIELD_DELIMITER
-    private var lineDelim: String = CsvInputFormat.DEFAULT_LINE_DELIMITER
-    private var isIgnoreFirstLine: Boolean = false
-    private var commentPrefix: String = _
-    private var lenient: Boolean = false
-
-    /**
-      * Sets the path to the CSV file. Required.
-      *
-      * @param path the path to the CSV file
-      */
-    def path(path: String): Builder = {
-      this.path = path
-      this
-    }
-
-    /**
-      * Sets the field delimiter, "," by default.
-      *
-      * @param delim the field delimiter
-      */
-    def fieldDelimiter(delim: String): Builder = {
-      this.fieldDelim = delim
-      this
-    }
-
-    /**
-      * Sets the line delimiter, "\n" by default.
-      *
-      * @param delim the line delimiter
-      */
-    def lineDelimiter(delim: String): Builder = {
-      this.lineDelim = delim
-      this
-    }
-
-    /**
-      * Adds a field with the field name and the type information. Required.
-      * This method can be called multiple times. The call order of this method defines
-      * also the order of the fields in a row.
-      *
-      * @param fieldName the field name
-      * @param fieldType the type information of the field
-      */
-    def field(fieldName: String, fieldType: TypeInformation[_]): Builder = {
-      if (schema.contains(fieldName)) {
-        throw new IllegalArgumentException(s"Duplicate field name $fieldName.")
-      }
-      schema += (fieldName -> fieldType)
-      this
-    }
-
-    /**
-      * Sets a quote character for String values, null by default.
-      *
-      * @param quote the quote character
-      */
-    def quoteCharacter(quote: Character): Builder = {
-      this.quoteCharacter = quote
-      this
-    }
-
-    /**
-      * Sets a prefix to indicate comments, null by default.
-      *
-      * @param prefix the prefix to indicate comments
-      */
-    def commentPrefix(prefix: String): Builder = {
-      this.commentPrefix = prefix
-      this
-    }
-
-    /**
-      * Ignore the first line. Not skip the first line by default.
-      */
-    def ignoreFirstLine(): Builder = {
-      this.isIgnoreFirstLine = true
-      this
-    }
-
-    /**
-      * Skip records with parse error instead to fail. Throw an exception by default.
-      */
-    def ignoreParseErrors(): Builder = {
-      this.lenient = true
-      this
-    }
-
-    /**
-      * Apply the current values and constructs a newly-created [[CsvTableSource]].
-      *
-      * @return a newly-created [[CsvTableSource]].
-      */
-    def build(): CsvTableSource = {
-      if (path == null) {
-        throw new IllegalArgumentException("Path must be defined.")
-      }
-      if (schema.isEmpty) {
-        throw new IllegalArgumentException("Fields can not be empty.")
-      }
-      new CsvTableSource(
-        path,
-        schema.keys.toArray,
-        schema.values.toArray,
-        fieldDelim,
-        lineDelim,
-        quoteCharacter,
-        isIgnoreFirstLine,
-        commentPrefix,
-        lenient)
-    }
-
-  }
-
-  /**
-    * Return a new builder that builds a [[CsvTableSource]].
-    *
-    * For example:
-    *
-    * {{{
-    *   val source: CsvTableSource = CsvTableSource
-    *     .builder()
-    *     .path("/path/to/your/file.csv")
-    *     .field("myfield", Types.STRING)
-    *     .field("myfield2", Types.INT)
-    *     .build()
-    * }}}
-    * @return a new builder to build a [[CsvTableSource]]
-    */
-  def builder(): Builder = new Builder
-}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala
index fdf834c..ba55e53 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala
@@ -27,7 +27,7 @@ import org.apache.flink.types.Row
 import org.junit.Assert._
 import org.junit.Test
 
-import scala.collection.mutable
+import _root_.scala.collection.mutable
 
 class TableSourceITCase extends AbstractTestBase {
 
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
index aff208c..0d633fe 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
@@ -46,20 +46,17 @@ object CommonTestData {
     )
 
     val tempFilePath = writeToTempFile(csvRecords.mkString("$"), "csv-test", "tmp")
-    new CsvTableSource(
-      tempFilePath,
-      Array("first", "id", "score", "last"),
-      Array(
-        Types.STRING,
-        Types.INT,
-        Types.DOUBLE,
-        Types.STRING
-      ),
-      fieldDelim = "#",
-      rowDelim = "$",
-      ignoreFirstLine = true,
-      ignoreComments = "%"
-    )
+    CsvTableSource.builder()
+      .path(tempFilePath)
+      .field("first",Types.STRING)
+      .field("id",Types.INT)
+      .field("score",Types.DOUBLE)
+      .field("last",Types.STRING)
+      .fieldDelimiter("#")
+      .lineDelimiter("$")
+      .ignoreFirstLine()
+      .commentPrefix("%")
+      .build()
   }
 
   def getInMemoryTestCatalog(isStreaming: Boolean): ExternalCatalog = {


[flink] 03/03: [FLINK-12978][table] Support LookupableTableSource for CsvTableSource

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c5ccbfef31af1db7eebd18f275a4f47dab65c855
Author: Jark Wu <im...@gmail.com>
AuthorDate: Wed Jul 3 10:37:45 2019 +0800

    [FLINK-12978][table] Support LookupableTableSource for CsvTableSource
---
 .../apache/flink/table/sources/CsvTableSource.java | 123 ++++++++++++++++++++-
 .../runtime/batch/sql/TableSourceITCase.scala      |  31 +++++-
 .../runtime/stream/sql/TableSourceITCase.scala     |  30 +++++
 .../apache/flink/table/util/testTableSources.scala |  47 +++++++-
 4 files changed, 228 insertions(+), 3 deletions(-)

diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSource.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSource.java
index 160bc9a..1cd49ae 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSource.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSource.java
@@ -24,15 +24,23 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.CsvInputFormat;
 import org.apache.flink.api.java.io.RowCsvInputFormat;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
 import org.apache.flink.types.Row;
 
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.stream.IntStream;
 
@@ -41,7 +49,8 @@ import java.util.stream.IntStream;
  * (logically) unlimited number of fields.
  */
 public class CsvTableSource
-	implements StreamTableSource<Row>, BatchTableSource<Row>, ProjectableTableSource<Row> {
+	implements StreamTableSource<Row>, BatchTableSource<Row>, LookupableTableSource<Row>,
+	ProjectableTableSource<Row> {
 
 	private final CsvInputFormatConfig config;
 
@@ -178,6 +187,21 @@ public class CsvTableSource
 	}
 
 	@Override
+	public TableFunction<Row> getLookupFunction(String[] lookupKeys) {
+		return new CsvLookupFunction(config, lookupKeys);
+	}
+
+	@Override
+	public AsyncTableFunction<Row> getAsyncLookupFunction(String[] lookupKeys) {
+		throw new UnsupportedOperationException("CSV do not support async lookup");
+	}
+
+	@Override
+	public boolean isAsyncEnabled() {
+		return false;
+	}
+
+	@Override
 	public String explainSource() {
 		String[] fields = config.getSelectedFieldNames();
 		return "CsvTableSource(read fields: " + String.join(", ", fields) + ")";
@@ -321,6 +345,103 @@ public class CsvTableSource
 
 	}
 
+	// ------------------------------------------------------------------------------------
+	// private utilities
+	// ------------------------------------------------------------------------------------
+
+	/**
+	 * LookupFunction to support lookup in CsvTableSource.
+	 */
+	public static class CsvLookupFunction extends TableFunction<Row> {
+		private static final long serialVersionUID = 1L;
+
+		private final CsvInputFormatConfig config;
+
+		private final List<Integer> sourceKeys = new ArrayList<>();
+		private final List<Integer> targetKeys = new ArrayList<>();
+		private final Map<Object, List<Row>> dataMap = new HashMap<>();
+
+		CsvLookupFunction(CsvInputFormatConfig config, String[] lookupKeys) {
+			this.config = config;
+
+			List<String> fields = Arrays.asList(config.getSelectedFieldNames());
+			for (int i = 0; i < lookupKeys.length; i++) {
+				sourceKeys.add(i);
+				int targetIdx = fields.indexOf(lookupKeys[i]);
+				assert targetIdx != -1;
+				targetKeys.add(targetIdx);
+			}
+		}
+
+		@Override
+		public TypeInformation<Row> getResultType() {
+			return new RowTypeInfo(config.getSelectedFieldTypes(), config.getSelectedFieldNames());
+		}
+
+		@Override
+		public void open(FunctionContext context) throws Exception {
+			super.open(context);
+			TypeInformation<Row> rowType = getResultType();
+
+			RowCsvInputFormat inputFormat = config.createInputFormat();
+			FileInputSplit[] inputSplits = inputFormat.createInputSplits(1);
+			for (FileInputSplit split : inputSplits) {
+				inputFormat.open(split);
+				Row row = new Row(rowType.getArity());
+				while (true) {
+					Row r = inputFormat.nextRecord(row);
+					if (r == null) {
+						break;
+					} else {
+						Object key = getTargetKey(r);
+						List<Row> rows = dataMap.computeIfAbsent(key, k -> new ArrayList<>());
+						rows.add(Row.copy(r));
+					}
+				}
+				inputFormat.close();
+			}
+		}
+
+		public void eval(Object... values) {
+			Object srcKey = getSourceKey(Row.of(values));
+			if (dataMap.containsKey(srcKey)) {
+				for (Row row1 : dataMap.get(srcKey)) {
+					collect(row1);
+				}
+			}
+		}
+
+		private Object getSourceKey(Row source) {
+			return getKey(source, sourceKeys);
+		}
+
+		private Object getTargetKey(Row target) {
+			return getKey(target, targetKeys);
+		}
+
+		private Object getKey(Row input, List<Integer> keys) {
+			if (keys.size() == 1) {
+				int keyIdx = keys.get(0);
+				if (input.getField(keyIdx) != null) {
+					return input.getField(keyIdx);
+				}
+				return null;
+			} else {
+				Row key = new Row(keys.size());
+				for (int i = 0; i < keys.size(); i++) {
+					int keyIdx = keys.get(i);
+					key.setField(i, input.getField(keyIdx));
+				}
+				return key;
+			}
+		}
+
+		@Override
+		public void close() throws Exception {
+			super.close();
+		}
+	}
+
 	private static class CsvInputFormatConfig implements Serializable {
 		private static final long serialVersionUID = 1L;
 
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableSourceITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableSourceITCase.scala
index 4abf691..464dcd1 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableSourceITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableSourceITCase.scala
@@ -26,7 +26,7 @@ import org.apache.flink.table.runtime.utils.{BatchTestBase, TestData}
 import org.apache.flink.table.types.TypeInfoDataTypeConverter
 import org.apache.flink.table.util.{TestFilterableTableSource, TestNestedProjectableTableSource, TestProjectableTableSource, TestTableSources}
 import org.apache.flink.types.Row
-import org.junit.{Before, Test}
+import org.junit.{Before, Ignore, Test}
 
 import java.lang.{Boolean => JBool, Integer => JInt, Long => JLong}
 
@@ -168,4 +168,33 @@ class TableSourceITCase extends BatchTestBase {
       )
     )
   }
+
+  @Ignore("[FLINK-13075] Project pushdown rule shouldn't require" +
+    " the TableSource return a modified schema in blink planner")
+  @Test
+  def testLookupJoinCsvTemporalTable(): Unit = {
+    val orders = TestTableSources.getOrdersCsvTableSource
+    val rates = TestTableSources.getRatesCsvTableSource
+    tEnv.registerTableSource("orders", orders)
+    tEnv.registerTableSource("rates", rates)
+
+    val sql =
+      """
+        |SELECT o.amount, o.currency, r.rate
+        |FROM (SELECT *, PROCTIME() as proc FROM orders) AS o
+        |JOIN rates FOR SYSTEM_TIME AS OF o.proc AS r
+        |ON o.currency = r.currency
+      """.stripMargin
+
+    checkResult(
+      sql,
+      Seq(
+        row(2, "Euro", 119),
+        row(1, "US Dollar", 102),
+        row(50, "Yen", 1),
+        row(3, "Euro", 119),
+        row(5, "US Dollar", 102)
+      )
+    )
+  }
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala
index 9f4c2c3..fa4922a 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala
@@ -336,4 +336,34 @@ class TableSourceITCase extends StreamingTestBase {
     assertEquals(expected.sorted, sink.getAppendResults.sorted)
   }
 
+  @Test
+  def testLookupJoinCsvTemporalTable(): Unit = {
+    val orders = TestTableSources.getOrdersCsvTableSource
+    val rates = TestTableSources.getRatesCsvTableSource
+    tEnv.registerTableSource("orders", orders)
+    tEnv.registerTableSource("rates", rates)
+
+    val sql =
+      """
+        |SELECT o.amount, o.currency, r.rate
+        |FROM (SELECT *, PROCTIME() as proc FROM orders) AS o
+        |JOIN rates FOR SYSTEM_TIME AS OF o.proc AS r
+        |ON o.currency = r.currency
+      """.stripMargin
+
+    val sink = new TestingAppendSink()
+    tEnv.sqlQuery(sql).toAppendStream[Row].addSink(sink)
+
+    env.execute()
+
+    val expected = Seq(
+      "2,Euro,119",
+      "1,US Dollar,102",
+      "50,Yen,1",
+      "3,Euro,119",
+      "5,US Dollar,102"
+    )
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala
index 776d2c4..54d14e4 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala
@@ -57,7 +57,10 @@ object TestTableSources {
       "Kelly#8#2.34#Williams"
     )
 
-    val tempFilePath = writeToTempFile(csvRecords.mkString("$"), "csv-test", "tmp")
+    val tempFilePath = writeToTempFile(
+      csvRecords.mkString("$"),
+      "csv-test",
+      "tmp")
     CsvTableSource.builder()
       .path(tempFilePath)
       .field("first", Types.STRING)
@@ -71,6 +74,48 @@ object TestTableSources {
       .build()
   }
 
+  def getOrdersCsvTableSource: CsvTableSource = {
+    val csvRecords = Seq(
+      "2,Euro,2",
+      "1,US Dollar,3",
+      "50,Yen,4",
+      "3,Euro,5",
+      "5,US Dollar,6"
+    )
+    val tempFilePath = writeToTempFile(
+      csvRecords.mkString("$"),
+      "csv-order-test",
+      "tmp")
+    CsvTableSource.builder()
+      .path(tempFilePath)
+      .field("amount", Types.LONG)
+      .field("currency", Types.STRING)
+      .field("ts",Types.LONG)
+      .fieldDelimiter(",")
+      .lineDelimiter("$")
+      .build()
+  }
+
+  def getRatesCsvTableSource: CsvTableSource = {
+    val csvRecords = Seq(
+      "US Dollar,102",
+      "Yen,1",
+      "Euro,119",
+      "RMB,702"
+    )
+    val tempFilePath = writeToTempFile(
+      csvRecords.mkString("$"),
+      "csv-rate-test",
+      "tmp")
+    CsvTableSource.builder()
+      .path(tempFilePath)
+      .field("currency", Types.STRING)
+      .field("rate", Types.LONG)
+      .fieldDelimiter(",")
+      .lineDelimiter("$")
+      .build()
+  }
+
   private def writeToTempFile(
       contents: String,
       filePrefix: String,


[flink] 02/03: [FLINK-12977][table] Port CsvTableSink to api-java-bridge

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 981a54d6718753ca5b2cd68259b72b4ecea9008a
Author: Gen Luo <lu...@gmail.com>
AuthorDate: Wed Jun 26 11:46:53 2019 +0800

    [FLINK-12977][table] Port CsvTableSink to api-java-bridge
---
 flink-python/pyflink/table/sinks.py                |  14 +-
 .../org/apache/flink/table/sinks/CsvTableSink.java | 187 +++++++++++++++++++++
 .../apache/flink/table/sinks/CsvTableSink.scala    | 140 ---------------
 .../runtime/batch/table/TableSinkITCase.scala      |   2 +-
 4 files changed, 193 insertions(+), 150 deletions(-)

diff --git a/flink-python/pyflink/table/sinks.py b/flink-python/pyflink/table/sinks.py
index 4aa968f..deb5b45 100644
--- a/flink-python/pyflink/table/sinks.py
+++ b/flink-python/pyflink/table/sinks.py
@@ -57,24 +57,20 @@ class CsvTableSink(TableSink):
                        and :data:`WriteMode.OVERWRITE`.
     """
 
-    def __init__(self, field_names, field_types, path, field_delimiter=',', num_files=1,
+    def __init__(self, field_names, field_types, path, field_delimiter=',', num_files=-1,
                  write_mode=None):
         # type: (list[str], list[DataType], str, str, int, int) -> None
         gateway = get_gateway()
         if write_mode == WriteMode.NO_OVERWRITE:
-            j_write_mode = gateway.jvm.scala.Option.apply(
-                gateway.jvm.org.apache.flink.core.fs.FileSystem.WriteMode.NO_OVERWRITE)
+            j_write_mode = gateway.jvm.org.apache.flink.core.fs.FileSystem.WriteMode.NO_OVERWRITE
         elif write_mode == WriteMode.OVERWRITE:
-            j_write_mode = gateway.jvm.scala.Option.apply(
-                gateway.jvm.org.apache.flink.core.fs.FileSystem.WriteMode.OVERWRITE)
+            j_write_mode = gateway.jvm.org.apache.flink.core.fs.FileSystem.WriteMode.OVERWRITE
         elif write_mode is None:
-            j_write_mode = gateway.jvm.scala.Option.empty()
+            j_write_mode = None
         else:
             raise Exception('Unsupported write_mode: %s' % write_mode)
-        j_some_field_delimiter = gateway.jvm.scala.Option.apply(field_delimiter)
-        j_some_num_files = gateway.jvm.scala.Option.apply(num_files)
         j_csv_table_sink = gateway.jvm.CsvTableSink(
-            path, j_some_field_delimiter, j_some_num_files, j_write_mode)
+            path, field_delimiter, num_files, j_write_mode)
         j_field_names = utils.to_jarray(gateway.jvm.String, field_names)
         j_field_types = utils.to_jarray(gateway.jvm.TypeInformation,
                                         [_to_java_type(field_type) for field_type in field_types])
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSink.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSink.java
new file mode 100644
index 0000000..f950f4d
--- /dev/null
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSink.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.operators.DataSink;
+import org.apache.flink.api.java.operators.MapOperator;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.table.utils.TableConnectorUtils;
+import org.apache.flink.types.Row;
+
+/**
+ * A simple {@link TableSink} to emit data as CSV files.
+ */
+public class CsvTableSink implements BatchTableSink<Row>, AppendStreamTableSink<Row> {
+	private String path;
+	private String fieldDelim;
+	private int numFiles = -1;
+	private FileSystem.WriteMode writeMode;
+
+	private String[] fieldNames;
+	private TypeInformation<?>[] fieldTypes;
+
+	/**
+	 * A simple {@link TableSink} to emit data as CSV files.
+	 *
+	 * @param path       The output path to write the Table to.
+	 * @param fieldDelim The field delimiter
+	 * @param numFiles   The number of files to write to
+	 * @param writeMode  The write mode to specify whether existing files are overwritten or not.
+	 */
+	public CsvTableSink(
+		String path,
+		String fieldDelim,
+		int numFiles,
+		FileSystem.WriteMode writeMode) {
+		this.path = path;
+		this.fieldDelim = fieldDelim;
+		this.numFiles = numFiles;
+		this.writeMode = writeMode;
+	}
+
+	/**
+	 * A simple {@link TableSink} to emit data as CSV files using comma as field delimiter, with default
+	 * parallelism and write mode.
+	 *
+	 * @param path The output path to write the Table to.
+	 */
+	public CsvTableSink(String path) {
+		this(path, ",");
+	}
+
+	/**
+	 * A simple {@link TableSink} to emit data as CSV files, with default parallelism and write mode.
+	 *
+	 * @param path       The output path to write the Table to.
+	 * @param fieldDelim The field delimiter
+	 */
+	public CsvTableSink(String path, String fieldDelim) {
+		this(path, fieldDelim, -1, null);
+	}
+
+	@Override
+	public void emitDataSet(DataSet<Row> dataSet) {
+		MapOperator<Row, String> csvRows =
+			dataSet.map(new CsvFormatter(fieldDelim == null ? "," : fieldDelim));
+
+		DataSink<String> sink;
+		if (writeMode != null) {
+			sink = csvRows.writeAsText(path, writeMode);
+		} else {
+			sink = csvRows.writeAsText(path);
+		}
+
+		if (numFiles > 0) {
+			csvRows.setParallelism(numFiles);
+			sink.setParallelism(numFiles);
+		}
+
+		sink.name(TableConnectorUtils.generateRuntimeName(CsvTableSink.class, fieldNames));
+	}
+
+	@Override
+	public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
+		SingleOutputStreamOperator<String> csvRows =
+			dataStream.map(new CsvFormatter(fieldDelim == null ? "," : fieldDelim));
+
+		DataStreamSink<String> sink;
+		if (writeMode != null) {
+			sink = csvRows.writeAsText(path, writeMode);
+		} else {
+			sink = csvRows.writeAsText(path);
+		}
+
+		if (numFiles > 0) {
+			csvRows.setParallelism(numFiles);
+			sink.setParallelism(numFiles);
+		}
+
+		sink.name(TableConnectorUtils.generateRuntimeName(CsvTableSink.class, fieldNames));
+
+		return sink;
+	}
+
+	@Override
+	public void emitDataStream(DataStream<Row> dataStream) {
+		consumeDataStream(dataStream);
+	}
+
+	@Override
+	public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
+		CsvTableSink configuredSink = new CsvTableSink(path, fieldDelim, numFiles, writeMode);
+		configuredSink.fieldNames = fieldNames;
+		configuredSink.fieldTypes = fieldTypes;
+		return configuredSink;
+	}
+
+	@Override
+	public TypeInformation<Row> getOutputType() {
+		return new RowTypeInfo(getFieldTypes(), getFieldNames());
+	}
+
+	@Override
+	public String[] getFieldNames() {
+		return fieldNames;
+	}
+
+	@Override
+	public TypeInformation<?>[] getFieldTypes() {
+		return fieldTypes;
+	}
+
+	/**
+	 * Formats a Row into a String with fields separated by the field delimiter.
+	 */
+	public static class CsvFormatter implements MapFunction<Row, String> {
+		private static final long serialVersionUID = 1L;
+
+		private final String fieldDelim;
+
+		/**
+		 * Constructor with field delimiter.
+		 *
+		 * @param fieldDelim The field delimiter.
+		 */
+		CsvFormatter(String fieldDelim) {
+			this.fieldDelim = fieldDelim;
+		}
+
+		@Override
+		public String map(Row row) {
+			StringBuilder builder = new StringBuilder();
+			Object o;
+			for (int i = 0; i < row.getArity(); i++) {
+				if (builder.length() != 0) {
+					builder.append(fieldDelim);
+				}
+				if ((o = row.getField(i)) != null) {
+					builder.append(o);
+				}
+			}
+			return builder.toString();
+		}
+	}
+}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala
deleted file mode 100644
index 8037f4d..0000000
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sinks
-
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.DataSet
-import org.apache.flink.types.Row
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.table.utils.TableConnectorUtils
-
-/**
-  * A simple [[TableSink]] to emit data as CSV files.
-  *
-  * @param path The output path to write the Table to.
-  * @param fieldDelim The field delimiter
-  * @param numFiles The number of files to write to
-  * @param writeMode The write mode to specify whether existing files are overwritten or not.
-  */
-class CsvTableSink(
-    path: String,
-    fieldDelim: Option[String],
-    numFiles: Option[Int],
-    writeMode: Option[WriteMode])
-  extends TableSinkBase[Row] with BatchTableSink[Row] with AppendStreamTableSink[Row] {
-
-  /**
-    * A simple [[TableSink]] to emit data as CSV files.
-    *
-    * @param path The output path to write the Table to.
-    * @param fieldDelim The field delimiter, ',' by default.
-    */
-  def this(path: String, fieldDelim: String = ",") {
-    this(path, Some(fieldDelim), None, None)
-  }
-
-  /**
-    * A simple [[TableSink]] to emit data as CSV files.
-    *
-    * @param path The output path to write the Table to.
-    * @param fieldDelim The field delimiter.
-    * @param numFiles The number of files to write to.
-    * @param writeMode The write mode to specify whether existing files are overwritten or not.
-    */
-  def this(path: String, fieldDelim: String, numFiles: Int, writeMode: WriteMode) {
-    this(path, Some(fieldDelim), Some(numFiles), Some(writeMode))
-  }
-
-  override def emitDataSet(dataSet: DataSet[Row]): Unit = {
-    val csvRows = dataSet.map(new CsvFormatter(fieldDelim.getOrElse(",")))
-
-    if (numFiles.isDefined) {
-      csvRows.setParallelism(numFiles.get)
-    }
-
-    val sink = writeMode match {
-      case None => csvRows.writeAsText(path)
-      case Some(wm) => csvRows.writeAsText(path, wm)
-    }
-
-    if (numFiles.isDefined) {
-      sink.setParallelism(numFiles.get)
-    }
-
-    sink.name(TableConnectorUtils.generateRuntimeName(this.getClass, getTableSchema.getFieldNames))
-  }
-
-  override def emitDataStream(dataStream: DataStream[Row]): Unit = {
-    val csvRows = dataStream.map(new CsvFormatter(fieldDelim.getOrElse(",")))
-
-    if (numFiles.isDefined) {
-      csvRows.setParallelism(numFiles.get)
-    }
-
-    val sink = writeMode match {
-      case None => csvRows.writeAsText(path)
-      case Some(wm) => csvRows.writeAsText(path, wm)
-    }
-
-    if (numFiles.isDefined) {
-      sink.setParallelism(numFiles.get)
-    }
-
-    sink.name(TableConnectorUtils.generateRuntimeName(this.getClass, getTableSchema.getFieldNames))
-  }
-
-  override protected def copy: TableSinkBase[Row] = {
-    new CsvTableSink(path, fieldDelim, numFiles, writeMode)
-  }
-
-  override def getOutputType: TypeInformation[Row] = {
-    new RowTypeInfo(getFieldTypes: _*)
-  }
-}
-
-/**
-  * Formats a [[Row]] into a [[String]] with fields separated by the field delimiter.
-  *
-  * @param fieldDelim The field delimiter.
-  */
-class CsvFormatter(fieldDelim: String) extends MapFunction[Row, String] {
-  override def map(row: Row): String = {
-
-    val builder = new StringBuilder
-
-    // write first value
-    val v = row.getField(0)
-    if (v != null) {
-      builder.append(v.toString)
-    }
-
-    // write following values
-    for (i <- 1 until row.getArity) {
-      builder.append(fieldDelim)
-      val v = row.getField(i)
-      if (v != null) {
-        builder.append(v.toString)
-      }
-    }
-    builder.mkString
-  }
-}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSinkITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSinkITCase.scala
index d64df26..1e88e01 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSinkITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSinkITCase.scala
@@ -54,7 +54,7 @@ class TableSinkITCase(
 
     tEnv.registerTableSink(
       "testSink",
-      new CsvTableSink(path, fieldDelim = "|").configure(
+      new CsvTableSink(path, "|").configure(
         Array[String]("c", "b"), Array[TypeInformation[_]](Types.STRING, Types.LONG)))
 
     val input = CollectionDataSets.get3TupleDataSet(env)