You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/07/03 09:46:29 UTC

[flink] 01/03: [FLINK-12977][table] Port CsvTableSource to api-java-bridge

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 635cd554e94e2c87858785f7e1227aa804eedc5e
Author: Gen Luo <lu...@gmail.com>
AuthorDate: Tue Jun 25 19:40:48 2019 +0800

    [FLINK-12977][table] Port CsvTableSource to api-java-bridge
    
    This closes #8872
---
 .../apache/flink/table/sources/CsvTableSource.java | 431 +++++++++++++++++++++
 .../flink/table/api/StreamTableEnvironment.scala   |   2 +-
 .../runtime/batch/sql/TableSourceITCase.scala      |  22 +-
 .../runtime/stream/sql/TableSourceITCase.scala     |  24 +-
 .../apache/flink/table/util/testTableSources.scala |  45 +++
 .../flink/table/sources/CsvTableSource.scala       | 364 -----------------
 .../runtime/stream/sql/TableSourceITCase.scala     |   2 +-
 .../flink/table/runtime/utils/CommonTestData.scala |  25 +-
 8 files changed, 532 insertions(+), 383 deletions(-)

diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSource.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSource.java
new file mode 100644
index 0000000..160bc9a
--- /dev/null
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSource.java
@@ -0,0 +1,431 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.CsvInputFormat;
+import org.apache.flink.api.java.io.RowCsvInputFormat;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.Objects;
+import java.util.stream.IntStream;
+
+/**
+ * A {@link StreamTableSource} and {@link BatchTableSource} for simple CSV files with a
+ * (logically) unlimited number of fields.
+ */
+public class CsvTableSource
+	implements StreamTableSource<Row>, BatchTableSource<Row>, ProjectableTableSource<Row> {
+
+	private final CsvInputFormatConfig config;
+
+	/**
+	 * A {@link InputFormatTableSource} and {@link LookupableTableSource} for simple CSV files with
+	 * a (logically) unlimited number of fields.
+	 *
+	 * @param path       The path to the CSV file.
+	 * @param fieldNames The names of the table fields.
+	 * @param fieldTypes The types of the table fields.
+	 */
+	public CsvTableSource(String path, String[] fieldNames, TypeInformation<?>[] fieldTypes) {
+		this(path, fieldNames, fieldTypes,
+			IntStream.range(0, fieldNames.length).toArray(),
+			CsvInputFormat.DEFAULT_FIELD_DELIMITER, CsvInputFormat.DEFAULT_LINE_DELIMITER,
+			null, false, null, false);
+	}
+
+	/**
+	 * A {@link InputFormatTableSource} and {@link LookupableTableSource} for simple CSV files with
+	 * a (logically) unlimited number of fields.
+	 *
+	 * @param path            The path to the CSV file.
+	 * @param fieldNames      The names of the table fields.
+	 * @param fieldTypes      The types of the table fields.
+	 * @param fieldDelim      The field delimiter, "," by default.
+	 * @param lineDelim       The row delimiter, "\n" by default.
+	 * @param quoteCharacter  An optional quote character for String values, null by default.
+	 * @param ignoreFirstLine Flag to ignore the first line, false by default.
+	 * @param ignoreComments  An optional prefix to indicate comments, null by default.
+	 * @param lenient         Flag to skip records with parse error instead to fail, false by
+	 *                        default.
+	 */
+	public CsvTableSource(
+		String path,
+		String[] fieldNames,
+		TypeInformation<?>[] fieldTypes,
+		String fieldDelim,
+		String lineDelim,
+		Character quoteCharacter,
+		boolean ignoreFirstLine,
+		String ignoreComments,
+		boolean lenient) {
+
+		this(path, fieldNames, fieldTypes,
+			IntStream.range(0, fieldNames.length).toArray(),
+			fieldDelim, lineDelim,
+			quoteCharacter, ignoreFirstLine, ignoreComments, lenient);
+	}
+
+	/**
+	 * A {@link InputFormatTableSource} and {@link LookupableTableSource} for simple CSV files with
+	 * a (logically) unlimited number of fields.
+	 *
+	 * @param path            The path to the CSV file.
+	 * @param fieldNames      The names of the table fields.
+	 * @param fieldTypes      The types of the table fields.
+	 * @param selectedFields  The fields which will be read and returned by the table source. If
+	 *                        None, all fields are returned.
+	 * @param fieldDelim      The field delimiter, "," by default.
+	 * @param lineDelim       The row delimiter, "\n" by default.
+	 * @param quoteCharacter  An optional quote character for String values, null by default.
+	 * @param ignoreFirstLine Flag to ignore the first line, false by default.
+	 * @param ignoreComments  An optional prefix to indicate comments, null by default.
+	 * @param lenient         Flag to skip records with parse error instead to fail, false by
+	 *                        default.
+	 */
+	public CsvTableSource(
+		String path,
+		String[] fieldNames,
+		TypeInformation<?>[] fieldTypes,
+		int[] selectedFields,
+		String fieldDelim,
+		String lineDelim,
+		Character quoteCharacter,
+		boolean ignoreFirstLine,
+		String ignoreComments,
+		boolean lenient) {
+		this(new CsvInputFormatConfig(path, fieldNames, fieldTypes, selectedFields,
+			fieldDelim, lineDelim, quoteCharacter, ignoreFirstLine, ignoreComments, lenient));
+	}
+
+	private CsvTableSource(CsvInputFormatConfig config) {
+		this.config = config;
+	}
+
+	/**
+	 * Return a new builder that builds a CsvTableSource. For example:
+	 * <pre>
+	 * CsvTableSource source = new CsvTableSource.builder()
+	 *     .path("/path/to/your/file.csv")
+	 *     .field("myfield", Types.STRING)
+	 *     .field("myfield2", Types.INT)
+	 *     .build();
+	 * </pre>
+	 *
+	 * @return a new builder to build a CsvTableSource
+	 */
+	public static Builder builder() {
+		return new Builder();
+	}
+
+	@Override
+	public TypeInformation<Row> getReturnType() {
+		return new RowTypeInfo(config.getSelectedFieldTypes(), config.getSelectedFieldNames());
+	}
+
+	@Override
+	public TableSchema getTableSchema() {
+		return new TableSchema(config.fieldNames, config.fieldTypes);
+	}
+
+	@Override
+	public CsvTableSource projectFields(int[] fields) {
+		if (fields.length == 0) {
+			fields = new int[]{0};
+		}
+		return new CsvTableSource(config.select(fields));
+	}
+
+	@Override
+	public boolean isBounded() {
+		return true;
+	}
+
+	@Override
+	public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
+		return execEnv.createInput(config.createInputFormat(), getReturnType()).name(explainSource());
+	}
+
+	@Override
+	public DataSet<Row> getDataSet(ExecutionEnvironment execEnv) {
+		return execEnv.createInput(config.createInputFormat(), getReturnType()).name(explainSource());
+	}
+
+	@Override
+	public String explainSource() {
+		String[] fields = config.getSelectedFieldNames();
+		return "CsvTableSource(read fields: " + String.join(", ", fields) + ")";
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		CsvTableSource that = (CsvTableSource) o;
+		return Objects.equals(config, that.config);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(config);
+	}
+
+	/**
+	 * A builder for creating CsvTableSource instances.
+	 */
+	public static class Builder {
+		private LinkedHashMap<String, TypeInformation<?>> schema = new LinkedHashMap<>();
+		private Character quoteCharacter;
+		private String path;
+		private String fieldDelim = CsvInputFormat.DEFAULT_FIELD_DELIMITER;
+		private String lineDelim = CsvInputFormat.DEFAULT_LINE_DELIMITER;
+		private boolean isIgnoreFirstLine = false;
+		private String commentPrefix;
+		private boolean lenient = false;
+
+		/**
+		 * Sets the path to the CSV file. Required.
+		 *
+		 * @param path the path to the CSV file
+		 */
+		public Builder path(String path) {
+			this.path = path;
+			return this;
+		}
+
+		/**
+		 * Sets the field delimiter, "," by default.
+		 *
+		 * @param delim the field delimiter
+		 */
+		public Builder fieldDelimiter(String delim) {
+			this.fieldDelim = delim;
+			return this;
+		}
+
+		/**
+		 * Sets the line delimiter, "\n" by default.
+		 *
+		 * @param delim the line delimiter
+		 */
+		public Builder lineDelimiter(String delim) {
+			this.lineDelim = delim;
+			return this;
+		}
+
+		/**
+		 * Adds a field with the field name and the type information. Required. This method can be
+		 * called multiple times. The call order of this method defines also the order of the fields
+		 * in a row.
+		 *
+		 * @param fieldName the field name
+		 * @param fieldType the type information of the field
+		 */
+		public Builder field(String fieldName, TypeInformation<?> fieldType) {
+			if (schema.containsKey(fieldName)) {
+				throw new IllegalArgumentException("Duplicate field name " + fieldName);
+			}
+			schema.put(fieldName, fieldType);
+			return this;
+		}
+
+		/**
+		 * Sets a quote character for String values, null by default.
+		 *
+		 * @param quote the quote character
+		 */
+		public Builder quoteCharacter(Character quote) {
+			this.quoteCharacter = quote;
+			return this;
+		}
+
+		/**
+		 * Sets a prefix to indicate comments, null by default.
+		 *
+		 * @param prefix the prefix to indicate comments
+		 */
+		public Builder commentPrefix(String prefix) {
+			this.commentPrefix = prefix;
+			return this;
+		}
+
+		/**
+		 * Ignore the first line. Not skip the first line by default.
+		 */
+		public Builder ignoreFirstLine() {
+			this.isIgnoreFirstLine = true;
+			return this;
+		}
+
+		/**
+		 * Skip records with parse error instead to fail. Throw an exception by default.
+		 */
+		public Builder ignoreParseErrors() {
+			this.lenient = true;
+			return this;
+		}
+
+		/**
+		 * Apply the current values and constructs a newly-created CsvTableSource.
+		 *
+		 * @return a newly-created CsvTableSource
+		 */
+		public CsvTableSource build() {
+			if (path == null) {
+				throw new IllegalArgumentException("Path must be defined.");
+			}
+			if (schema.isEmpty()) {
+				throw new IllegalArgumentException("Fields can not be empty.");
+			}
+			return new CsvTableSource(
+				path,
+				schema.keySet().toArray(new String[0]),
+				schema.values().toArray(new TypeInformation<?>[0]),
+				fieldDelim,
+				lineDelim,
+				quoteCharacter,
+				isIgnoreFirstLine,
+				commentPrefix,
+				lenient);
+		}
+
+	}
+
+	private static class CsvInputFormatConfig implements Serializable {
+		private static final long serialVersionUID = 1L;
+
+		private final String path;
+		private final String[] fieldNames;
+		private final TypeInformation<?>[] fieldTypes;
+		private final int[] selectedFields;
+
+		private final String fieldDelim;
+		private final String lineDelim;
+		private final Character quoteCharacter;
+		private final boolean ignoreFirstLine;
+		private final String ignoreComments;
+		private final boolean lenient;
+
+		CsvInputFormatConfig(
+			String path,
+			String[] fieldNames,
+			TypeInformation<?>[] fieldTypes,
+			int[] selectedFields,
+			String fieldDelim,
+			String lineDelim,
+			Character quoteCharacter,
+			boolean ignoreFirstLine,
+			String ignoreComments,
+			boolean lenient) {
+
+			this.path = path;
+			this.fieldNames = fieldNames;
+			this.fieldTypes = fieldTypes;
+			this.selectedFields = selectedFields;
+			this.fieldDelim = fieldDelim;
+			this.lineDelim = lineDelim;
+			this.quoteCharacter = quoteCharacter;
+			this.ignoreFirstLine = ignoreFirstLine;
+			this.ignoreComments = ignoreComments;
+			this.lenient = lenient;
+		}
+
+		String[] getSelectedFieldNames() {
+			String[] selectedFieldNames = new String[selectedFields.length];
+			for (int i = 0; i < selectedFields.length; i++) {
+				selectedFieldNames[i] = fieldNames[selectedFields[i]];
+			}
+			return selectedFieldNames;
+		}
+
+		TypeInformation<?>[] getSelectedFieldTypes() {
+			TypeInformation<?>[] selectedFieldTypes = new TypeInformation<?>[selectedFields.length];
+			for (int i = 0; i < selectedFields.length; i++) {
+				selectedFieldTypes[i] = fieldTypes[selectedFields[i]];
+			}
+			return selectedFieldTypes;
+		}
+
+		RowCsvInputFormat createInputFormat() {
+			RowCsvInputFormat inputFormat = new RowCsvInputFormat(
+				new Path(path),
+				getSelectedFieldTypes(),
+				lineDelim,
+				fieldDelim,
+				selectedFields);
+			inputFormat.setSkipFirstLineAsHeader(ignoreFirstLine);
+			inputFormat.setCommentPrefix(ignoreComments);
+			inputFormat.setLenient(lenient);
+			if (quoteCharacter != null) {
+				inputFormat.enableQuotedStringParsing(quoteCharacter);
+			}
+			return inputFormat;
+		}
+
+		CsvInputFormatConfig select(int[] fields) {
+			return new CsvInputFormatConfig(path, fieldNames, fieldTypes, fields,
+				fieldDelim, lineDelim, quoteCharacter, ignoreFirstLine, ignoreComments, lenient);
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			CsvInputFormatConfig that = (CsvInputFormatConfig) o;
+			return ignoreFirstLine == that.ignoreFirstLine &&
+				lenient == that.lenient &&
+				Objects.equals(path, that.path) &&
+				Arrays.equals(fieldNames, that.fieldNames) &&
+				Arrays.equals(fieldTypes, that.fieldTypes) &&
+				Arrays.equals(selectedFields, that.selectedFields) &&
+				Objects.equals(fieldDelim, that.fieldDelim) &&
+				Objects.equals(lineDelim, that.lineDelim) &&
+				Objects.equals(quoteCharacter, that.quoteCharacter) &&
+				Objects.equals(ignoreComments, that.ignoreComments);
+		}
+
+		@Override
+		public int hashCode() {
+			int result = Objects.hash(path, fieldDelim, lineDelim, quoteCharacter, ignoreFirstLine,
+				ignoreComments, lenient);
+			result = 31 * result + Arrays.hashCode(fieldNames);
+			result = 31 * result + Arrays.hashCode(fieldTypes);
+			result = 31 * result + Arrays.hashCode(selectedFields);
+			return result;
+		}
+	}
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index 3a5edd3..e4bcc8c 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -116,7 +116,7 @@ abstract class StreamTableEnvironment(
     // TODO TableSourceUtil.validateTableSource(tableSource)
     tableSource match {
       // check for proper stream table source
-      case streamTableSource: StreamTableSource[_] if !streamTableSource.isBounded => // ok
+      case _: StreamTableSource[_] => // StreamEnv can handle both bounded and unbounded ok
       // TODO `TableSourceUtil.hasRowtimeAttribute` depends on [Expression]
       // check that event-time is enabled if table source includes rowtime attributes
       // if (TableSourceUtil.hasRowtimeAttribute(streamTableSource) &&
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableSourceITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableSourceITCase.scala
index 39c59ed..4abf691 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableSourceITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableSourceITCase.scala
@@ -24,9 +24,8 @@ import org.apache.flink.table.api.{DataTypes, TableConfigOptions, TableSchema, T
 import org.apache.flink.table.runtime.utils.BatchTestBase.row
 import org.apache.flink.table.runtime.utils.{BatchTestBase, TestData}
 import org.apache.flink.table.types.TypeInfoDataTypeConverter
-import org.apache.flink.table.util.{TestFilterableTableSource, TestNestedProjectableTableSource, TestProjectableTableSource}
+import org.apache.flink.table.util.{TestFilterableTableSource, TestNestedProjectableTableSource, TestProjectableTableSource, TestTableSources}
 import org.apache.flink.types.Row
-
 import org.junit.{Before, Test}
 
 import java.lang.{Boolean => JBool, Integer => JInt, Long => JLong}
@@ -150,4 +149,23 @@ class TableSourceITCase extends BatchTestBase {
         row(8, "Record_8"))
     )
   }
+
+  @Test
+  def testCsvTableSource(): Unit = {
+    val csvTable = TestTableSources.getPersonCsvTableSource
+    tEnv.registerTableSource("csvTable", csvTable)
+    checkResult(
+      "SELECT id, `first`, `last`, score FROM csvTable",
+      Seq(
+        row(1, "Mike", "Smith", 12.3),
+        row(2, "Bob", "Taylor", 45.6),
+        row(3, "Sam", "Miller", 7.89),
+        row(4, "Peter", "Smith", 0.12),
+        row(5, "Liz", "Williams", 34.5),
+        row(6, "Sally", "Miller", 6.78),
+        row(7, "Alice", "Smith", 90.1),
+        row(8, "Kelly", "Williams", 2.34)
+      )
+    )
+  }
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala
index b1a2fc4..9f4c2c3 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala
@@ -26,12 +26,13 @@ import org.apache.flink.table.api.{TableSchema, Types}
 import org.apache.flink.table.runtime.utils.{StreamingTestBase, TestingAppendSink}
 import org.apache.flink.table.util._
 import org.apache.flink.types.Row
-
 import org.junit.Assert._
 import org.junit.Test
 
 import java.lang.{Boolean => JBool, Integer => JInt, Long => JLong}
 
+import scala.collection.mutable
+
 class TableSourceITCase extends StreamingTestBase {
 
   @Test
@@ -314,4 +315,25 @@ class TableSourceITCase extends StreamingTestBase {
     val expected = Seq("5,Record_5", "6,Record_6", "7,Record_7", "8,Record_8")
     assertEquals(expected.sorted, sink.getAppendResults.sorted)
   }
+
+  @Test
+  def testCsvTableSource(): Unit = {
+    val csvTable = TestTableSources.getPersonCsvTableSource
+    tEnv.registerTableSource("persons", csvTable)
+
+    val sink = new TestingAppendSink()
+    tEnv.sqlQuery(
+      "SELECT id, `first`, `last`, score FROM persons WHERE id < 4 ")
+      .toAppendStream[Row]
+      .addSink(sink)
+
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "1,Mike,Smith,12.3",
+      "2,Bob,Taylor,45.6",
+      "3,Sam,Miller,7.89")
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala
index 4c407f8..776d2c4 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala
@@ -33,6 +33,7 @@ import org.apache.flink.table.sources.tsextractors.ExistingField
 import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, PreserveWatermarks}
 import org.apache.flink.types.Row
 
+import java.io.{File, FileOutputStream, OutputStreamWriter}
 import java.util
 import java.util.{Collections, List => JList}
 
@@ -40,6 +41,50 @@ import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 
+object TestTableSources {
+
+  def getPersonCsvTableSource: CsvTableSource = {
+    val csvRecords = Seq(
+      "First#Id#Score#Last",
+      "Mike#1#12.3#Smith",
+      "Bob#2#45.6#Taylor",
+      "Sam#3#7.89#Miller",
+      "Peter#4#0.12#Smith",
+      "% Just a comment",
+      "Liz#5#34.5#Williams",
+      "Sally#6#6.78#Miller",
+      "Alice#7#90.1#Smith",
+      "Kelly#8#2.34#Williams"
+    )
+
+    val tempFilePath = writeToTempFile(csvRecords.mkString("$"), "csv-test", "tmp")
+    CsvTableSource.builder()
+      .path(tempFilePath)
+      .field("first", Types.STRING)
+      .field("id", Types.INT)
+      .field("score",Types.DOUBLE)
+      .field("last",Types.STRING)
+      .fieldDelimiter("#")
+      .lineDelimiter("$")
+      .ignoreFirstLine()
+      .commentPrefix("%")
+      .build()
+  }
+
+  private def writeToTempFile(
+      contents: String,
+      filePrefix: String,
+      fileSuffix: String,
+      charset: String = "UTF-8"): String = {
+    val tempFile = File.createTempFile(filePrefix, fileSuffix)
+    tempFile.deleteOnExit()
+    val tmpWriter = new OutputStreamWriter(new FileOutputStream(tempFile), charset)
+    tmpWriter.write(contents)
+    tmpWriter.close()
+    tempFile.getAbsolutePath
+  }
+}
+
 class TestTableSourceWithTime[T](
     isBatch: Boolean,
     tableSchema: TableSchema,
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
deleted file mode 100644
index f7215ee..0000000
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
+++ /dev/null
@@ -1,364 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sources
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.io.CsvInputFormat
-import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
-import org.apache.flink.types.Row
-import org.apache.flink.api.java.io.RowCsvInputFormat
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.core.fs.Path
-import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
-import org.apache.flink.table.api.{TableException, TableSchema}
-
-import scala.collection.mutable
-
-/**
-  * A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV files with a
-  * (logically) unlimited number of fields.
-  *
-  * @param path The path to the CSV file.
-  * @param fieldNames The names of the table fields.
-  * @param fieldTypes The types of the table fields.
-  * @param selectedFields The fields which will be read and returned by the table source.
-  *                       If None, all fields are returned.
-  * @param fieldDelim The field delimiter, "," by default.
-  * @param rowDelim The row delimiter, "\n" by default.
-  * @param quoteCharacter An optional quote character for String values, null by default.
-  * @param ignoreFirstLine Flag to ignore the first line, false by default.
-  * @param ignoreComments An optional prefix to indicate comments, null by default.
-  * @param lenient Flag to skip records with parse error instead to fail, false by default.
-  */
-class CsvTableSource private (
-    private val path: String,
-    private val fieldNames: Array[String],
-    private val fieldTypes: Array[TypeInformation[_]],
-    private val selectedFields: Array[Int],
-    private val fieldDelim: String,
-    private val rowDelim: String,
-    private val quoteCharacter: Character,
-    private val ignoreFirstLine: Boolean,
-    private val ignoreComments: String,
-    private val lenient: Boolean)
-  extends BatchTableSource[Row]
-  with StreamTableSource[Row]
-  with ProjectableTableSource[Row] {
-
-  /**
-    * A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV files with a
-    * (logically) unlimited number of fields.
-    *
-    * @param path The path to the CSV file.
-    * @param fieldNames The names of the table fields.
-    * @param fieldTypes The types of the table fields.
-    * @param fieldDelim The field delimiter, "," by default.
-    * @param rowDelim The row delimiter, "\n" by default.
-    * @param quoteCharacter An optional quote character for String values, null by default.
-    * @param ignoreFirstLine Flag to ignore the first line, false by default.
-    * @param ignoreComments An optional prefix to indicate comments, null by default.
-    * @param lenient Flag to skip records with parse error instead to fail, false by default.
-    */
-  def this(
-    path: String,
-    fieldNames: Array[String],
-    fieldTypes: Array[TypeInformation[_]],
-    fieldDelim: String = CsvInputFormat.DEFAULT_FIELD_DELIMITER,
-    rowDelim: String = CsvInputFormat.DEFAULT_LINE_DELIMITER,
-    quoteCharacter: Character = null,
-    ignoreFirstLine: Boolean = false,
-    ignoreComments: String = null,
-    lenient: Boolean = false) = {
-
-    this(
-      path,
-      fieldNames,
-      fieldTypes,
-      fieldTypes.indices.toArray, // initially, all fields are returned
-      fieldDelim,
-      rowDelim,
-      quoteCharacter,
-      ignoreFirstLine,
-      ignoreComments,
-      lenient)
-
-  }
-
-  /**
-    * A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV files with a
-    * (logically) unlimited number of fields.
-    *
-    * @param path The path to the CSV file.
-    * @param fieldNames The names of the table fields.
-    * @param fieldTypes The types of the table fields.
-    */
-  def this(path: String, fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]) = {
-    this(path, fieldNames, fieldTypes, CsvInputFormat.DEFAULT_FIELD_DELIMITER,
-      CsvInputFormat.DEFAULT_LINE_DELIMITER, null, false, null, false)
-  }
-
-  if (fieldNames.length != fieldTypes.length) {
-    throw new TableException("Number of field names and field types must be equal.")
-  }
-
-  private val selectedFieldTypes = selectedFields.map(fieldTypes(_))
-  private val selectedFieldNames = selectedFields.map(fieldNames(_))
-
-  private val returnType: RowTypeInfo = new RowTypeInfo(selectedFieldTypes, selectedFieldNames)
-
-  /**
-    * Returns the data of the table as a [[DataSet]] of [[Row]].
-    *
-    * NOTE: This method is for internal use only for defining a [[TableSource]].
-    *       Do not use it in Table API programs.
-    */
-  override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = {
-    execEnv.createInput(createCsvInput(), returnType).name(explainSource())
-  }
-
-  /** Returns the [[RowTypeInfo]] for the return type of the [[CsvTableSource]]. */
-  override def getReturnType: RowTypeInfo = returnType
-
-  /**
-    * Returns the data of the table as a [[DataStream]] of [[Row]].
-    *
-    * NOTE: This method is for internal use only for defining a [[TableSource]].
-    *       Do not use it in Table API programs.
-    */
-  override def getDataStream(streamExecEnv: StreamExecutionEnvironment): DataStream[Row] = {
-    streamExecEnv.createInput(createCsvInput(), returnType).name(explainSource())
-  }
-
-  /** Returns the schema of the produced table. */
-  override def getTableSchema = new TableSchema(fieldNames, fieldTypes)
-
-  /** Returns a copy of [[TableSource]] with ability to project fields */
-  override def projectFields(fields: Array[Int]): CsvTableSource = {
-
-    val selectedFields = if (fields.isEmpty) Array(0) else fields
-
-    new CsvTableSource(
-      path,
-      fieldNames,
-      fieldTypes,
-      selectedFields,
-      fieldDelim,
-      rowDelim,
-      quoteCharacter,
-      ignoreFirstLine,
-      ignoreComments,
-      lenient)
-  }
-
-  private def createCsvInput(): RowCsvInputFormat = {
-    val inputFormat = new RowCsvInputFormat(
-      new Path(path),
-      selectedFieldTypes,
-      rowDelim,
-      fieldDelim,
-      selectedFields)
-
-    inputFormat.setSkipFirstLineAsHeader(ignoreFirstLine)
-    inputFormat.setLenient(lenient)
-    if (quoteCharacter != null) {
-      inputFormat.enableQuotedStringParsing(quoteCharacter)
-    }
-    if (ignoreComments != null) {
-      inputFormat.setCommentPrefix(ignoreComments)
-    }
-
-    inputFormat
-  }
-
-  override def equals(other: Any): Boolean = other match {
-    case that: CsvTableSource => returnType == that.returnType &&
-        path == that.path &&
-        fieldDelim == that.fieldDelim &&
-        rowDelim == that.rowDelim &&
-        quoteCharacter == that.quoteCharacter &&
-        ignoreFirstLine == that.ignoreFirstLine &&
-        ignoreComments == that.ignoreComments &&
-        lenient == that.lenient
-    case _ => false
-  }
-
-  override def hashCode(): Int = {
-    returnType.hashCode()
-  }
-
-  override def explainSource(): String = {
-    s"CsvTableSource(" +
-      s"read fields: ${getReturnType.getFieldNames.mkString(", ")})"
-  }
-}
-
-object CsvTableSource {
-
-  /**
-    * A builder for creating [[CsvTableSource]] instances.
-    *
-    * For example:
-    *
-    * {{{
-    *   val source: CsvTableSource = new CsvTableSource.builder()
-    *     .path("/path/to/your/file.csv")
-    *     .field("myfield", Types.STRING)
-    *     .field("myfield2", Types.INT)
-    *     .build()
-    * }}}
-    *
-    */
-  class Builder {
-
-    private val schema: mutable.LinkedHashMap[String, TypeInformation[_]] =
-      mutable.LinkedHashMap[String, TypeInformation[_]]()
-    private var quoteCharacter: Character = _
-    private var path: String = _
-    private var fieldDelim: String = CsvInputFormat.DEFAULT_FIELD_DELIMITER
-    private var lineDelim: String = CsvInputFormat.DEFAULT_LINE_DELIMITER
-    private var isIgnoreFirstLine: Boolean = false
-    private var commentPrefix: String = _
-    private var lenient: Boolean = false
-
-    /**
-      * Sets the path to the CSV file. Required.
-      *
-      * @param path the path to the CSV file
-      */
-    def path(path: String): Builder = {
-      this.path = path
-      this
-    }
-
-    /**
-      * Sets the field delimiter, "," by default.
-      *
-      * @param delim the field delimiter
-      */
-    def fieldDelimiter(delim: String): Builder = {
-      this.fieldDelim = delim
-      this
-    }
-
-    /**
-      * Sets the line delimiter, "\n" by default.
-      *
-      * @param delim the line delimiter
-      */
-    def lineDelimiter(delim: String): Builder = {
-      this.lineDelim = delim
-      this
-    }
-
-    /**
-      * Adds a field with the field name and the type information. Required.
-      * This method can be called multiple times. The call order of this method defines
-      * also the order of the fields in a row.
-      *
-      * @param fieldName the field name
-      * @param fieldType the type information of the field
-      */
-    def field(fieldName: String, fieldType: TypeInformation[_]): Builder = {
-      if (schema.contains(fieldName)) {
-        throw new IllegalArgumentException(s"Duplicate field name $fieldName.")
-      }
-      schema += (fieldName -> fieldType)
-      this
-    }
-
-    /**
-      * Sets a quote character for String values, null by default.
-      *
-      * @param quote the quote character
-      */
-    def quoteCharacter(quote: Character): Builder = {
-      this.quoteCharacter = quote
-      this
-    }
-
-    /**
-      * Sets a prefix to indicate comments, null by default.
-      *
-      * @param prefix the prefix to indicate comments
-      */
-    def commentPrefix(prefix: String): Builder = {
-      this.commentPrefix = prefix
-      this
-    }
-
-    /**
-      * Ignore the first line. Not skip the first line by default.
-      */
-    def ignoreFirstLine(): Builder = {
-      this.isIgnoreFirstLine = true
-      this
-    }
-
-    /**
-      * Skip records with parse error instead to fail. Throw an exception by default.
-      */
-    def ignoreParseErrors(): Builder = {
-      this.lenient = true
-      this
-    }
-
-    /**
-      * Apply the current values and constructs a newly-created [[CsvTableSource]].
-      *
-      * @return a newly-created [[CsvTableSource]].
-      */
-    def build(): CsvTableSource = {
-      if (path == null) {
-        throw new IllegalArgumentException("Path must be defined.")
-      }
-      if (schema.isEmpty) {
-        throw new IllegalArgumentException("Fields can not be empty.")
-      }
-      new CsvTableSource(
-        path,
-        schema.keys.toArray,
-        schema.values.toArray,
-        fieldDelim,
-        lineDelim,
-        quoteCharacter,
-        isIgnoreFirstLine,
-        commentPrefix,
-        lenient)
-    }
-
-  }
-
-  /**
-    * Return a new builder that builds a [[CsvTableSource]].
-    *
-    * For example:
-    *
-    * {{{
-    *   val source: CsvTableSource = CsvTableSource
-    *     .builder()
-    *     .path("/path/to/your/file.csv")
-    *     .field("myfield", Types.STRING)
-    *     .field("myfield2", Types.INT)
-    *     .build()
-    * }}}
-    * @return a new builder to build a [[CsvTableSource]]
-    */
-  def builder(): Builder = new Builder
-}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala
index fdf834c..ba55e53 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala
@@ -27,7 +27,7 @@ import org.apache.flink.types.Row
 import org.junit.Assert._
 import org.junit.Test
 
-import scala.collection.mutable
+import _root_.scala.collection.mutable
 
 class TableSourceITCase extends AbstractTestBase {
 
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
index aff208c..0d633fe 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
@@ -46,20 +46,17 @@ object CommonTestData {
     )
 
     val tempFilePath = writeToTempFile(csvRecords.mkString("$"), "csv-test", "tmp")
-    new CsvTableSource(
-      tempFilePath,
-      Array("first", "id", "score", "last"),
-      Array(
-        Types.STRING,
-        Types.INT,
-        Types.DOUBLE,
-        Types.STRING
-      ),
-      fieldDelim = "#",
-      rowDelim = "$",
-      ignoreFirstLine = true,
-      ignoreComments = "%"
-    )
+    CsvTableSource.builder()
+      .path(tempFilePath)
+      .field("first",Types.STRING)
+      .field("id",Types.INT)
+      .field("score",Types.DOUBLE)
+      .field("last",Types.STRING)
+      .fieldDelimiter("#")
+      .lineDelimiter("$")
+      .ignoreFirstLine()
+      .commentPrefix("%")
+      .build()
   }
 
   def getInMemoryTestCatalog(isStreaming: Boolean): ExternalCatalog = {