You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/07/03 09:46:29 UTC
[flink] 01/03: [FLINK-12977][table] Port CsvTableSource to
api-java-bridge
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 635cd554e94e2c87858785f7e1227aa804eedc5e
Author: Gen Luo <lu...@gmail.com>
AuthorDate: Tue Jun 25 19:40:48 2019 +0800
[FLINK-12977][table] Port CsvTableSource to api-java-bridge
This closes #8872
---
.../apache/flink/table/sources/CsvTableSource.java | 431 +++++++++++++++++++++
.../flink/table/api/StreamTableEnvironment.scala | 2 +-
.../runtime/batch/sql/TableSourceITCase.scala | 22 +-
.../runtime/stream/sql/TableSourceITCase.scala | 24 +-
.../apache/flink/table/util/testTableSources.scala | 45 +++
.../flink/table/sources/CsvTableSource.scala | 364 -----------------
.../runtime/stream/sql/TableSourceITCase.scala | 2 +-
.../flink/table/runtime/utils/CommonTestData.scala | 25 +-
8 files changed, 532 insertions(+), 383 deletions(-)
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSource.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSource.java
new file mode 100644
index 0000000..160bc9a
--- /dev/null
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSource.java
@@ -0,0 +1,431 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.CsvInputFormat;
+import org.apache.flink.api.java.io.RowCsvInputFormat;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.Objects;
+import java.util.stream.IntStream;
+
+/**
+ * A {@link StreamTableSource} and {@link BatchTableSource} for simple CSV files with a
+ * (logically) unlimited number of fields.
+ */
+public class CsvTableSource
+ implements StreamTableSource<Row>, BatchTableSource<Row>, ProjectableTableSource<Row> {
+
+ private final CsvInputFormatConfig config;
+
+ /**
+ * A {@link InputFormatTableSource} and {@link LookupableTableSource} for simple CSV files with
+ * a (logically) unlimited number of fields.
+ *
+ * @param path The path to the CSV file.
+ * @param fieldNames The names of the table fields.
+ * @param fieldTypes The types of the table fields.
+ */
+ public CsvTableSource(String path, String[] fieldNames, TypeInformation<?>[] fieldTypes) {
+ this(path, fieldNames, fieldTypes,
+ IntStream.range(0, fieldNames.length).toArray(),
+ CsvInputFormat.DEFAULT_FIELD_DELIMITER, CsvInputFormat.DEFAULT_LINE_DELIMITER,
+ null, false, null, false);
+ }
+
+ /**
+ * A {@link InputFormatTableSource} and {@link LookupableTableSource} for simple CSV files with
+ * a (logically) unlimited number of fields.
+ *
+ * @param path The path to the CSV file.
+ * @param fieldNames The names of the table fields.
+ * @param fieldTypes The types of the table fields.
+ * @param fieldDelim The field delimiter, "," by default.
+ * @param lineDelim The row delimiter, "\n" by default.
+ * @param quoteCharacter An optional quote character for String values, null by default.
+ * @param ignoreFirstLine Flag to ignore the first line, false by default.
+ * @param ignoreComments An optional prefix to indicate comments, null by default.
+ * @param lenient Flag to skip records with parse error instead to fail, false by
+ * default.
+ */
+ public CsvTableSource(
+ String path,
+ String[] fieldNames,
+ TypeInformation<?>[] fieldTypes,
+ String fieldDelim,
+ String lineDelim,
+ Character quoteCharacter,
+ boolean ignoreFirstLine,
+ String ignoreComments,
+ boolean lenient) {
+
+ this(path, fieldNames, fieldTypes,
+ IntStream.range(0, fieldNames.length).toArray(),
+ fieldDelim, lineDelim,
+ quoteCharacter, ignoreFirstLine, ignoreComments, lenient);
+ }
+
+ /**
+ * A {@link InputFormatTableSource} and {@link LookupableTableSource} for simple CSV files with
+ * a (logically) unlimited number of fields.
+ *
+ * @param path The path to the CSV file.
+ * @param fieldNames The names of the table fields.
+ * @param fieldTypes The types of the table fields.
+ * @param selectedFields The fields which will be read and returned by the table source. If
+ * None, all fields are returned.
+ * @param fieldDelim The field delimiter, "," by default.
+ * @param lineDelim The row delimiter, "\n" by default.
+ * @param quoteCharacter An optional quote character for String values, null by default.
+ * @param ignoreFirstLine Flag to ignore the first line, false by default.
+ * @param ignoreComments An optional prefix to indicate comments, null by default.
+ * @param lenient Flag to skip records with parse error instead to fail, false by
+ * default.
+ */
+ public CsvTableSource(
+ String path,
+ String[] fieldNames,
+ TypeInformation<?>[] fieldTypes,
+ int[] selectedFields,
+ String fieldDelim,
+ String lineDelim,
+ Character quoteCharacter,
+ boolean ignoreFirstLine,
+ String ignoreComments,
+ boolean lenient) {
+ this(new CsvInputFormatConfig(path, fieldNames, fieldTypes, selectedFields,
+ fieldDelim, lineDelim, quoteCharacter, ignoreFirstLine, ignoreComments, lenient));
+ }
+
+ private CsvTableSource(CsvInputFormatConfig config) {
+ this.config = config;
+ }
+
+ /**
+ * Return a new builder that builds a CsvTableSource. For example:
+ * <pre>
+ * CsvTableSource source = new CsvTableSource.builder()
+ * .path("/path/to/your/file.csv")
+ * .field("myfield", Types.STRING)
+ * .field("myfield2", Types.INT)
+ * .build();
+ * </pre>
+ *
+ * @return a new builder to build a CsvTableSource
+ */
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ @Override
+ public TypeInformation<Row> getReturnType() {
+ return new RowTypeInfo(config.getSelectedFieldTypes(), config.getSelectedFieldNames());
+ }
+
+ @Override
+ public TableSchema getTableSchema() {
+ return new TableSchema(config.fieldNames, config.fieldTypes);
+ }
+
+ @Override
+ public CsvTableSource projectFields(int[] fields) {
+ if (fields.length == 0) {
+ fields = new int[]{0};
+ }
+ return new CsvTableSource(config.select(fields));
+ }
+
+ @Override
+ public boolean isBounded() {
+ return true;
+ }
+
+ @Override
+ public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
+ return execEnv.createInput(config.createInputFormat(), getReturnType()).name(explainSource());
+ }
+
+ @Override
+ public DataSet<Row> getDataSet(ExecutionEnvironment execEnv) {
+ return execEnv.createInput(config.createInputFormat(), getReturnType()).name(explainSource());
+ }
+
+ @Override
+ public String explainSource() {
+ String[] fields = config.getSelectedFieldNames();
+ return "CsvTableSource(read fields: " + String.join(", ", fields) + ")";
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ CsvTableSource that = (CsvTableSource) o;
+ return Objects.equals(config, that.config);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(config);
+ }
+
+ /**
+ * A builder for creating CsvTableSource instances.
+ */
+ public static class Builder {
+ private LinkedHashMap<String, TypeInformation<?>> schema = new LinkedHashMap<>();
+ private Character quoteCharacter;
+ private String path;
+ private String fieldDelim = CsvInputFormat.DEFAULT_FIELD_DELIMITER;
+ private String lineDelim = CsvInputFormat.DEFAULT_LINE_DELIMITER;
+ private boolean isIgnoreFirstLine = false;
+ private String commentPrefix;
+ private boolean lenient = false;
+
+ /**
+ * Sets the path to the CSV file. Required.
+ *
+ * @param path the path to the CSV file
+ */
+ public Builder path(String path) {
+ this.path = path;
+ return this;
+ }
+
+ /**
+ * Sets the field delimiter, "," by default.
+ *
+ * @param delim the field delimiter
+ */
+ public Builder fieldDelimiter(String delim) {
+ this.fieldDelim = delim;
+ return this;
+ }
+
+ /**
+ * Sets the line delimiter, "\n" by default.
+ *
+ * @param delim the line delimiter
+ */
+ public Builder lineDelimiter(String delim) {
+ this.lineDelim = delim;
+ return this;
+ }
+
+ /**
+ * Adds a field with the field name and the type information. Required. This method can be
+ * called multiple times. The call order of this method defines also the order of the fields
+ * in a row.
+ *
+ * @param fieldName the field name
+ * @param fieldType the type information of the field
+ */
+ public Builder field(String fieldName, TypeInformation<?> fieldType) {
+ if (schema.containsKey(fieldName)) {
+ throw new IllegalArgumentException("Duplicate field name " + fieldName);
+ }
+ schema.put(fieldName, fieldType);
+ return this;
+ }
+
+ /**
+ * Sets a quote character for String values, null by default.
+ *
+ * @param quote the quote character
+ */
+ public Builder quoteCharacter(Character quote) {
+ this.quoteCharacter = quote;
+ return this;
+ }
+
+ /**
+ * Sets a prefix to indicate comments, null by default.
+ *
+ * @param prefix the prefix to indicate comments
+ */
+ public Builder commentPrefix(String prefix) {
+ this.commentPrefix = prefix;
+ return this;
+ }
+
+ /**
+ * Ignore the first line. Not skip the first line by default.
+ */
+ public Builder ignoreFirstLine() {
+ this.isIgnoreFirstLine = true;
+ return this;
+ }
+
+ /**
+ * Skip records with parse error instead to fail. Throw an exception by default.
+ */
+ public Builder ignoreParseErrors() {
+ this.lenient = true;
+ return this;
+ }
+
+ /**
+ * Apply the current values and constructs a newly-created CsvTableSource.
+ *
+ * @return a newly-created CsvTableSource
+ */
+ public CsvTableSource build() {
+ if (path == null) {
+ throw new IllegalArgumentException("Path must be defined.");
+ }
+ if (schema.isEmpty()) {
+ throw new IllegalArgumentException("Fields can not be empty.");
+ }
+ return new CsvTableSource(
+ path,
+ schema.keySet().toArray(new String[0]),
+ schema.values().toArray(new TypeInformation<?>[0]),
+ fieldDelim,
+ lineDelim,
+ quoteCharacter,
+ isIgnoreFirstLine,
+ commentPrefix,
+ lenient);
+ }
+
+ }
+
+ private static class CsvInputFormatConfig implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final String path;
+ private final String[] fieldNames;
+ private final TypeInformation<?>[] fieldTypes;
+ private final int[] selectedFields;
+
+ private final String fieldDelim;
+ private final String lineDelim;
+ private final Character quoteCharacter;
+ private final boolean ignoreFirstLine;
+ private final String ignoreComments;
+ private final boolean lenient;
+
+ CsvInputFormatConfig(
+ String path,
+ String[] fieldNames,
+ TypeInformation<?>[] fieldTypes,
+ int[] selectedFields,
+ String fieldDelim,
+ String lineDelim,
+ Character quoteCharacter,
+ boolean ignoreFirstLine,
+ String ignoreComments,
+ boolean lenient) {
+
+ this.path = path;
+ this.fieldNames = fieldNames;
+ this.fieldTypes = fieldTypes;
+ this.selectedFields = selectedFields;
+ this.fieldDelim = fieldDelim;
+ this.lineDelim = lineDelim;
+ this.quoteCharacter = quoteCharacter;
+ this.ignoreFirstLine = ignoreFirstLine;
+ this.ignoreComments = ignoreComments;
+ this.lenient = lenient;
+ }
+
+ String[] getSelectedFieldNames() {
+ String[] selectedFieldNames = new String[selectedFields.length];
+ for (int i = 0; i < selectedFields.length; i++) {
+ selectedFieldNames[i] = fieldNames[selectedFields[i]];
+ }
+ return selectedFieldNames;
+ }
+
+ TypeInformation<?>[] getSelectedFieldTypes() {
+ TypeInformation<?>[] selectedFieldTypes = new TypeInformation<?>[selectedFields.length];
+ for (int i = 0; i < selectedFields.length; i++) {
+ selectedFieldTypes[i] = fieldTypes[selectedFields[i]];
+ }
+ return selectedFieldTypes;
+ }
+
+ RowCsvInputFormat createInputFormat() {
+ RowCsvInputFormat inputFormat = new RowCsvInputFormat(
+ new Path(path),
+ getSelectedFieldTypes(),
+ lineDelim,
+ fieldDelim,
+ selectedFields);
+ inputFormat.setSkipFirstLineAsHeader(ignoreFirstLine);
+ inputFormat.setCommentPrefix(ignoreComments);
+ inputFormat.setLenient(lenient);
+ if (quoteCharacter != null) {
+ inputFormat.enableQuotedStringParsing(quoteCharacter);
+ }
+ return inputFormat;
+ }
+
+ CsvInputFormatConfig select(int[] fields) {
+ return new CsvInputFormatConfig(path, fieldNames, fieldTypes, fields,
+ fieldDelim, lineDelim, quoteCharacter, ignoreFirstLine, ignoreComments, lenient);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ CsvInputFormatConfig that = (CsvInputFormatConfig) o;
+ return ignoreFirstLine == that.ignoreFirstLine &&
+ lenient == that.lenient &&
+ Objects.equals(path, that.path) &&
+ Arrays.equals(fieldNames, that.fieldNames) &&
+ Arrays.equals(fieldTypes, that.fieldTypes) &&
+ Arrays.equals(selectedFields, that.selectedFields) &&
+ Objects.equals(fieldDelim, that.fieldDelim) &&
+ Objects.equals(lineDelim, that.lineDelim) &&
+ Objects.equals(quoteCharacter, that.quoteCharacter) &&
+ Objects.equals(ignoreComments, that.ignoreComments);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = Objects.hash(path, fieldDelim, lineDelim, quoteCharacter, ignoreFirstLine,
+ ignoreComments, lenient);
+ result = 31 * result + Arrays.hashCode(fieldNames);
+ result = 31 * result + Arrays.hashCode(fieldTypes);
+ result = 31 * result + Arrays.hashCode(selectedFields);
+ return result;
+ }
+ }
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index 3a5edd3..e4bcc8c 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -116,7 +116,7 @@ abstract class StreamTableEnvironment(
// TODO TableSourceUtil.validateTableSource(tableSource)
tableSource match {
// check for proper stream table source
- case streamTableSource: StreamTableSource[_] if !streamTableSource.isBounded => // ok
+ case _: StreamTableSource[_] => // StreamEnv can handle both bounded and unbounded ok
// TODO `TableSourceUtil.hasRowtimeAttribute` depends on [Expression]
// check that event-time is enabled if table source includes rowtime attributes
// if (TableSourceUtil.hasRowtimeAttribute(streamTableSource) &&
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableSourceITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableSourceITCase.scala
index 39c59ed..4abf691 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableSourceITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableSourceITCase.scala
@@ -24,9 +24,8 @@ import org.apache.flink.table.api.{DataTypes, TableConfigOptions, TableSchema, T
import org.apache.flink.table.runtime.utils.BatchTestBase.row
import org.apache.flink.table.runtime.utils.{BatchTestBase, TestData}
import org.apache.flink.table.types.TypeInfoDataTypeConverter
-import org.apache.flink.table.util.{TestFilterableTableSource, TestNestedProjectableTableSource, TestProjectableTableSource}
+import org.apache.flink.table.util.{TestFilterableTableSource, TestNestedProjectableTableSource, TestProjectableTableSource, TestTableSources}
import org.apache.flink.types.Row
-
import org.junit.{Before, Test}
import java.lang.{Boolean => JBool, Integer => JInt, Long => JLong}
@@ -150,4 +149,23 @@ class TableSourceITCase extends BatchTestBase {
row(8, "Record_8"))
)
}
+
+ @Test
+ def testCsvTableSource(): Unit = {
+ val csvTable = TestTableSources.getPersonCsvTableSource
+ tEnv.registerTableSource("csvTable", csvTable)
+ checkResult(
+ "SELECT id, `first`, `last`, score FROM csvTable",
+ Seq(
+ row(1, "Mike", "Smith", 12.3),
+ row(2, "Bob", "Taylor", 45.6),
+ row(3, "Sam", "Miller", 7.89),
+ row(4, "Peter", "Smith", 0.12),
+ row(5, "Liz", "Williams", 34.5),
+ row(6, "Sally", "Miller", 6.78),
+ row(7, "Alice", "Smith", 90.1),
+ row(8, "Kelly", "Williams", 2.34)
+ )
+ )
+ }
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala
index b1a2fc4..9f4c2c3 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala
@@ -26,12 +26,13 @@ import org.apache.flink.table.api.{TableSchema, Types}
import org.apache.flink.table.runtime.utils.{StreamingTestBase, TestingAppendSink}
import org.apache.flink.table.util._
import org.apache.flink.types.Row
-
import org.junit.Assert._
import org.junit.Test
import java.lang.{Boolean => JBool, Integer => JInt, Long => JLong}
+import scala.collection.mutable
+
class TableSourceITCase extends StreamingTestBase {
@Test
@@ -314,4 +315,25 @@ class TableSourceITCase extends StreamingTestBase {
val expected = Seq("5,Record_5", "6,Record_6", "7,Record_7", "8,Record_8")
assertEquals(expected.sorted, sink.getAppendResults.sorted)
}
+
+ @Test
+ def testCsvTableSource(): Unit = {
+ val csvTable = TestTableSources.getPersonCsvTableSource
+ tEnv.registerTableSource("persons", csvTable)
+
+ val sink = new TestingAppendSink()
+ tEnv.sqlQuery(
+ "SELECT id, `first`, `last`, score FROM persons WHERE id < 4 ")
+ .toAppendStream[Row]
+ .addSink(sink)
+
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "1,Mike,Smith,12.3",
+ "2,Bob,Taylor,45.6",
+ "3,Sam,Miller,7.89")
+ assertEquals(expected.sorted, sink.getAppendResults.sorted)
+ }
+
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala
index 4c407f8..776d2c4 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala
@@ -33,6 +33,7 @@ import org.apache.flink.table.sources.tsextractors.ExistingField
import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, PreserveWatermarks}
import org.apache.flink.types.Row
+import java.io.{File, FileOutputStream, OutputStreamWriter}
import java.util
import java.util.{Collections, List => JList}
@@ -40,6 +41,50 @@ import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import scala.collection.mutable
+object TestTableSources {
+
+ def getPersonCsvTableSource: CsvTableSource = {
+ val csvRecords = Seq(
+ "First#Id#Score#Last",
+ "Mike#1#12.3#Smith",
+ "Bob#2#45.6#Taylor",
+ "Sam#3#7.89#Miller",
+ "Peter#4#0.12#Smith",
+ "% Just a comment",
+ "Liz#5#34.5#Williams",
+ "Sally#6#6.78#Miller",
+ "Alice#7#90.1#Smith",
+ "Kelly#8#2.34#Williams"
+ )
+
+ val tempFilePath = writeToTempFile(csvRecords.mkString("$"), "csv-test", "tmp")
+ CsvTableSource.builder()
+ .path(tempFilePath)
+ .field("first", Types.STRING)
+ .field("id", Types.INT)
+ .field("score",Types.DOUBLE)
+ .field("last",Types.STRING)
+ .fieldDelimiter("#")
+ .lineDelimiter("$")
+ .ignoreFirstLine()
+ .commentPrefix("%")
+ .build()
+ }
+
+ private def writeToTempFile(
+ contents: String,
+ filePrefix: String,
+ fileSuffix: String,
+ charset: String = "UTF-8"): String = {
+ val tempFile = File.createTempFile(filePrefix, fileSuffix)
+ tempFile.deleteOnExit()
+ val tmpWriter = new OutputStreamWriter(new FileOutputStream(tempFile), charset)
+ tmpWriter.write(contents)
+ tmpWriter.close()
+ tempFile.getAbsolutePath
+ }
+}
+
class TestTableSourceWithTime[T](
isBatch: Boolean,
tableSchema: TableSchema,
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
deleted file mode 100644
index f7215ee..0000000
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
+++ /dev/null
@@ -1,364 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sources
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.io.CsvInputFormat
-import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
-import org.apache.flink.types.Row
-import org.apache.flink.api.java.io.RowCsvInputFormat
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.core.fs.Path
-import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
-import org.apache.flink.table.api.{TableException, TableSchema}
-
-import scala.collection.mutable
-
-/**
- * A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV files with a
- * (logically) unlimited number of fields.
- *
- * @param path The path to the CSV file.
- * @param fieldNames The names of the table fields.
- * @param fieldTypes The types of the table fields.
- * @param selectedFields The fields which will be read and returned by the table source.
- * If None, all fields are returned.
- * @param fieldDelim The field delimiter, "," by default.
- * @param rowDelim The row delimiter, "\n" by default.
- * @param quoteCharacter An optional quote character for String values, null by default.
- * @param ignoreFirstLine Flag to ignore the first line, false by default.
- * @param ignoreComments An optional prefix to indicate comments, null by default.
- * @param lenient Flag to skip records with parse error instead to fail, false by default.
- */
-class CsvTableSource private (
- private val path: String,
- private val fieldNames: Array[String],
- private val fieldTypes: Array[TypeInformation[_]],
- private val selectedFields: Array[Int],
- private val fieldDelim: String,
- private val rowDelim: String,
- private val quoteCharacter: Character,
- private val ignoreFirstLine: Boolean,
- private val ignoreComments: String,
- private val lenient: Boolean)
- extends BatchTableSource[Row]
- with StreamTableSource[Row]
- with ProjectableTableSource[Row] {
-
- /**
- * A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV files with a
- * (logically) unlimited number of fields.
- *
- * @param path The path to the CSV file.
- * @param fieldNames The names of the table fields.
- * @param fieldTypes The types of the table fields.
- * @param fieldDelim The field delimiter, "," by default.
- * @param rowDelim The row delimiter, "\n" by default.
- * @param quoteCharacter An optional quote character for String values, null by default.
- * @param ignoreFirstLine Flag to ignore the first line, false by default.
- * @param ignoreComments An optional prefix to indicate comments, null by default.
- * @param lenient Flag to skip records with parse error instead to fail, false by default.
- */
- def this(
- path: String,
- fieldNames: Array[String],
- fieldTypes: Array[TypeInformation[_]],
- fieldDelim: String = CsvInputFormat.DEFAULT_FIELD_DELIMITER,
- rowDelim: String = CsvInputFormat.DEFAULT_LINE_DELIMITER,
- quoteCharacter: Character = null,
- ignoreFirstLine: Boolean = false,
- ignoreComments: String = null,
- lenient: Boolean = false) = {
-
- this(
- path,
- fieldNames,
- fieldTypes,
- fieldTypes.indices.toArray, // initially, all fields are returned
- fieldDelim,
- rowDelim,
- quoteCharacter,
- ignoreFirstLine,
- ignoreComments,
- lenient)
-
- }
-
- /**
- * A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV files with a
- * (logically) unlimited number of fields.
- *
- * @param path The path to the CSV file.
- * @param fieldNames The names of the table fields.
- * @param fieldTypes The types of the table fields.
- */
- def this(path: String, fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]) = {
- this(path, fieldNames, fieldTypes, CsvInputFormat.DEFAULT_FIELD_DELIMITER,
- CsvInputFormat.DEFAULT_LINE_DELIMITER, null, false, null, false)
- }
-
- if (fieldNames.length != fieldTypes.length) {
- throw new TableException("Number of field names and field types must be equal.")
- }
-
- private val selectedFieldTypes = selectedFields.map(fieldTypes(_))
- private val selectedFieldNames = selectedFields.map(fieldNames(_))
-
- private val returnType: RowTypeInfo = new RowTypeInfo(selectedFieldTypes, selectedFieldNames)
-
- /**
- * Returns the data of the table as a [[DataSet]] of [[Row]].
- *
- * NOTE: This method is for internal use only for defining a [[TableSource]].
- * Do not use it in Table API programs.
- */
- override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = {
- execEnv.createInput(createCsvInput(), returnType).name(explainSource())
- }
-
- /** Returns the [[RowTypeInfo]] for the return type of the [[CsvTableSource]]. */
- override def getReturnType: RowTypeInfo = returnType
-
- /**
- * Returns the data of the table as a [[DataStream]] of [[Row]].
- *
- * NOTE: This method is for internal use only for defining a [[TableSource]].
- * Do not use it in Table API programs.
- */
- override def getDataStream(streamExecEnv: StreamExecutionEnvironment): DataStream[Row] = {
- streamExecEnv.createInput(createCsvInput(), returnType).name(explainSource())
- }
-
- /** Returns the schema of the produced table. */
- override def getTableSchema = new TableSchema(fieldNames, fieldTypes)
-
- /** Returns a copy of [[TableSource]] with ability to project fields */
- override def projectFields(fields: Array[Int]): CsvTableSource = {
-
- val selectedFields = if (fields.isEmpty) Array(0) else fields
-
- new CsvTableSource(
- path,
- fieldNames,
- fieldTypes,
- selectedFields,
- fieldDelim,
- rowDelim,
- quoteCharacter,
- ignoreFirstLine,
- ignoreComments,
- lenient)
- }
-
- private def createCsvInput(): RowCsvInputFormat = {
- val inputFormat = new RowCsvInputFormat(
- new Path(path),
- selectedFieldTypes,
- rowDelim,
- fieldDelim,
- selectedFields)
-
- inputFormat.setSkipFirstLineAsHeader(ignoreFirstLine)
- inputFormat.setLenient(lenient)
- if (quoteCharacter != null) {
- inputFormat.enableQuotedStringParsing(quoteCharacter)
- }
- if (ignoreComments != null) {
- inputFormat.setCommentPrefix(ignoreComments)
- }
-
- inputFormat
- }
-
- override def equals(other: Any): Boolean = other match {
- case that: CsvTableSource => returnType == that.returnType &&
- path == that.path &&
- fieldDelim == that.fieldDelim &&
- rowDelim == that.rowDelim &&
- quoteCharacter == that.quoteCharacter &&
- ignoreFirstLine == that.ignoreFirstLine &&
- ignoreComments == that.ignoreComments &&
- lenient == that.lenient
- case _ => false
- }
-
- override def hashCode(): Int = {
- returnType.hashCode()
- }
-
- override def explainSource(): String = {
- s"CsvTableSource(" +
- s"read fields: ${getReturnType.getFieldNames.mkString(", ")})"
- }
-}
-
-object CsvTableSource {
-
- /**
- * A builder for creating [[CsvTableSource]] instances.
- *
- * For example:
- *
- * {{{
- * val source: CsvTableSource = new CsvTableSource.builder()
- * .path("/path/to/your/file.csv")
- * .field("myfield", Types.STRING)
- * .field("myfield2", Types.INT)
- * .build()
- * }}}
- *
- */
- class Builder {
-
- private val schema: mutable.LinkedHashMap[String, TypeInformation[_]] =
- mutable.LinkedHashMap[String, TypeInformation[_]]()
- private var quoteCharacter: Character = _
- private var path: String = _
- private var fieldDelim: String = CsvInputFormat.DEFAULT_FIELD_DELIMITER
- private var lineDelim: String = CsvInputFormat.DEFAULT_LINE_DELIMITER
- private var isIgnoreFirstLine: Boolean = false
- private var commentPrefix: String = _
- private var lenient: Boolean = false
-
- /**
- * Sets the path to the CSV file. Required.
- *
- * @param path the path to the CSV file
- */
- def path(path: String): Builder = {
- this.path = path
- this
- }
-
- /**
- * Sets the field delimiter, "," by default.
- *
- * @param delim the field delimiter
- */
- def fieldDelimiter(delim: String): Builder = {
- this.fieldDelim = delim
- this
- }
-
- /**
- * Sets the line delimiter, "\n" by default.
- *
- * @param delim the line delimiter
- */
- def lineDelimiter(delim: String): Builder = {
- this.lineDelim = delim
- this
- }
-
- /**
- * Adds a field with the field name and the type information. Required.
- * This method can be called multiple times. The call order of this method defines
- * also the order of the fields in a row.
- *
- * @param fieldName the field name
- * @param fieldType the type information of the field
- */
- def field(fieldName: String, fieldType: TypeInformation[_]): Builder = {
- if (schema.contains(fieldName)) {
- throw new IllegalArgumentException(s"Duplicate field name $fieldName.")
- }
- schema += (fieldName -> fieldType)
- this
- }
-
- /**
- * Sets a quote character for String values, null by default.
- *
- * @param quote the quote character
- */
- def quoteCharacter(quote: Character): Builder = {
- this.quoteCharacter = quote
- this
- }
-
- /**
- * Sets a prefix to indicate comments, null by default.
- *
- * @param prefix the prefix to indicate comments
- */
- def commentPrefix(prefix: String): Builder = {
- this.commentPrefix = prefix
- this
- }
-
- /**
- * Ignore the first line. Not skip the first line by default.
- */
- def ignoreFirstLine(): Builder = {
- this.isIgnoreFirstLine = true
- this
- }
-
- /**
- * Skip records with parse error instead to fail. Throw an exception by default.
- */
- def ignoreParseErrors(): Builder = {
- this.lenient = true
- this
- }
-
- /**
- * Apply the current values and constructs a newly-created [[CsvTableSource]].
- *
- * @return a newly-created [[CsvTableSource]].
- */
- def build(): CsvTableSource = {
- if (path == null) {
- throw new IllegalArgumentException("Path must be defined.")
- }
- if (schema.isEmpty) {
- throw new IllegalArgumentException("Fields can not be empty.")
- }
- new CsvTableSource(
- path,
- schema.keys.toArray,
- schema.values.toArray,
- fieldDelim,
- lineDelim,
- quoteCharacter,
- isIgnoreFirstLine,
- commentPrefix,
- lenient)
- }
-
- }
-
- /**
- * Return a new builder that builds a [[CsvTableSource]].
- *
- * For example:
- *
- * {{{
- * val source: CsvTableSource = CsvTableSource
- * .builder()
- * .path("/path/to/your/file.csv")
- * .field("myfield", Types.STRING)
- * .field("myfield2", Types.INT)
- * .build()
- * }}}
- * @return a new builder to build a [[CsvTableSource]]
- */
- def builder(): Builder = new Builder
-}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala
index fdf834c..ba55e53 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala
@@ -27,7 +27,7 @@ import org.apache.flink.types.Row
import org.junit.Assert._
import org.junit.Test
-import scala.collection.mutable
+import _root_.scala.collection.mutable
class TableSourceITCase extends AbstractTestBase {
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
index aff208c..0d633fe 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
@@ -46,20 +46,17 @@ object CommonTestData {
)
val tempFilePath = writeToTempFile(csvRecords.mkString("$"), "csv-test", "tmp")
- new CsvTableSource(
- tempFilePath,
- Array("first", "id", "score", "last"),
- Array(
- Types.STRING,
- Types.INT,
- Types.DOUBLE,
- Types.STRING
- ),
- fieldDelim = "#",
- rowDelim = "$",
- ignoreFirstLine = true,
- ignoreComments = "%"
- )
+ CsvTableSource.builder()
+ .path(tempFilePath)
+ .field("first",Types.STRING)
+ .field("id",Types.INT)
+ .field("score",Types.DOUBLE)
+ .field("last",Types.STRING)
+ .fieldDelimiter("#")
+ .lineDelimiter("$")
+ .ignoreFirstLine()
+ .commentPrefix("%")
+ .build()
}
def getInMemoryTestCatalog(isStreaming: Boolean): ExternalCatalog = {