You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/02/15 15:15:00 UTC
flink git commit: [FLINK-5714] [table] Use a builder pattern for
creating CsvTableSource
Repository: flink
Updated Branches:
refs/heads/master 04e6758ab -> 6c310a762
[FLINK-5714] [table] Use a builder pattern for creating CsvTableSource
This closes #3273.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6c310a76
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6c310a76
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6c310a76
Branch: refs/heads/master
Commit: 6c310a7628f6da09289dedc465db9294507fb10e
Parents: 04e6758
Author: Jark Wu <wu...@alibaba-inc.com>
Authored: Mon Feb 6 21:57:04 2017 +0800
Committer: twalthr <tw...@apache.org>
Committed: Wed Feb 15 16:07:17 2017 +0100
----------------------------------------------------------------------
docs/dev/table_api.md | 73 ++++---
.../flink/table/sources/CsvTableSource.scala | 190 ++++++++++++++++++-
.../table/api/scala/batch/TableSourceTest.scala | 56 +++++-
3 files changed, 270 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6c310a76/docs/dev/table_api.md
----------------------------------------------------------------------
diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md
index 0efd258..6e516fc 100644
--- a/docs/dev/table_api.md
+++ b/docs/dev/table_api.md
@@ -253,56 +253,51 @@ Table result = tableEnvironment.ingest("kafka-source");
The `CsvTableSource` is already included in `flink-table` without additional dependecies.
-It can be configured with the following properties:
-
- - `path` The path to the CSV file, required.
- - `fieldNames` The names of the table fields, required.
- - `fieldTypes` The types of the table fields, required.
- - `fieldDelim` The field delimiter, `","` by default.
- - `rowDelim` The row delimiter, `"\n"` by default.
- - `quoteCharacter` An optional quote character for String values, `null` by default.
- - `ignoreFirstLine` Flag to ignore the first line, `false` by default.
- - `ignoreComments` An optional prefix to indicate comments, `null` by default.
- - `lenient` Flag to skip records with parse error instead to fail, `false` by default.
+The easiest way to create a `CsvTableSource` is by using the enclosed builder `CsvTableSource.builder()`, the builder has the following methods to configure properties:
+
+ - `path(String path)` Sets the path to the CSV file, required.
+ - `field(String fieldName, TypeInformation<?> fieldType)` Adds a field with the field name and field type information, can be called multiple times, required. The call order of this method defines also the order of the fields in a row.
+ - `fieldDelimiter(String delim)` Sets the field delimiter, `","` by default.
+ - `lineDelimiter(String delim)` Sets the line delimiter, `"\n"` by default.
+ - `quoteCharacter(Character quote)` Sets the quote character for String values, `null` by default.
+ - `commentPrefix(String prefix)` Sets a prefix to indicate comments, `null` by default.
+ - `ignoreFirstLine()` Ignore the first line. Disabled by default.
+ - `ignoreParseErrors()` Skip records with parse error instead to fail. Throwing an exception by default.
You can create the source as follows:
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
-CsvTableSource csvTableSource = new CsvTableSource(
- "/path/to/your/file.csv",
- new String[] { "name", "id", "score", "comments" },
- new TypeInformation<?>[] {
- Types.STRING(),
- Types.INT(),
- Types.DOUBLE(),
- Types.STRING()
- },
- "#", // fieldDelim
- "$", // rowDelim
- null, // quoteCharacter
- true, // ignoreFirstLine
- "%", // ignoreComments
- false); // lenient
+CsvTableSource csvTableSource = CsvTableSource
+ .builder()
+ .path("/path/to/your/file.csv")
+ .field("name", Types.STRING())
+ .field("id", Types.INT())
+ .field("score", Types.DOUBLE())
+ .field("comments", Types.STRING())
+ .fieldDelimiter("#")
+ .lineDelimiter("$")
+ .ignoreFirstLine()
+ .ignoreParseErrors()
+ .commentPrefix("%");
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
-val csvTableSource = new CsvTableSource(
- "/path/to/your/file.csv",
- Array("name", "id", "score", "comments"),
- Array(
- Types.STRING,
- Types.INT,
- Types.DOUBLE,
- Types.STRING
- ),
- fieldDelim = "#",
- rowDelim = "$",
- ignoreFirstLine = true,
- ignoreComments = "%")
+val csvTableSource = CsvTableSource
+ .builder
+ .path("/path/to/your/file.csv")
+ .field("name", Types.STRING)
+ .field("id", Types.INT)
+ .field("score", Types.DOUBLE)
+ .field("comments", Types.STRING)
+ .fieldDelimiter("#")
+ .lineDelimiter("$")
+ .ignoreFirstLine
+ .ignoreParseErrors
+ .commentPrefix("%")
{% endhighlight %}
</div>
</div>
http://git-wip-us.apache.org/repos/asf/flink/blob/6c310a76/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
index f59a331..8a458ef 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
@@ -29,6 +29,8 @@ import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.table.api.TableException
+import scala.collection.mutable
+
/**
* A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV files with a
* (logically) unlimited number of fields.
@@ -44,15 +46,15 @@ import org.apache.flink.table.api.TableException
* @param lenient Flag to skip records with parse error instead to fail, false by default.
*/
class CsvTableSource(
- path: String,
- fieldNames: Array[String],
- fieldTypes: Array[TypeInformation[_]],
- fieldDelim: String = CsvInputFormat.DEFAULT_FIELD_DELIMITER,
- rowDelim: String = CsvInputFormat.DEFAULT_LINE_DELIMITER,
- quoteCharacter: Character = null,
- ignoreFirstLine: Boolean = false,
- ignoreComments: String = null,
- lenient: Boolean = false)
+ private val path: String,
+ private val fieldNames: Array[String],
+ private val fieldTypes: Array[TypeInformation[_]],
+ private val fieldDelim: String = CsvInputFormat.DEFAULT_FIELD_DELIMITER,
+ private val rowDelim: String = CsvInputFormat.DEFAULT_LINE_DELIMITER,
+ private val quoteCharacter: Character = null,
+ private val ignoreFirstLine: Boolean = false,
+ private val ignoreComments: String = null,
+ private val lenient: Boolean = false)
extends BatchTableSource[Row]
with StreamTableSource[Row]
with ProjectableTableSource[Row] {
@@ -138,4 +140,174 @@ class CsvTableSource(
inputFormat
}
+
+ override def equals(other: Any): Boolean = other match {
+ case that: CsvTableSource => returnType == that.returnType &&
+ path == that.path &&
+ fieldDelim == that.fieldDelim &&
+ rowDelim == that.rowDelim &&
+ quoteCharacter == that.quoteCharacter &&
+ ignoreFirstLine == that.ignoreFirstLine &&
+ ignoreComments == that.ignoreComments &&
+ lenient == that.lenient
+ case _ => false
+ }
+
+ override def hashCode(): Int = {
+ returnType.hashCode()
+ }
+}
+
+object CsvTableSource {
+
+ /**
+ * A builder for creating [[CsvTableSource]] instances.
+ *
+ * For example:
+ *
+ * {{{
+ * val source: CsvTableSource = new CsvTableSource.builder()
+ * .path("/path/to/your/file.csv")
+ * .field("myfield", Types.STRING)
+ * .field("myfield2", Types.INT)
+ * .build()
+ * }}}
+ *
+ */
+ class Builder {
+
+ private val schema: mutable.LinkedHashMap[String, TypeInformation[_]] =
+ mutable.LinkedHashMap[String, TypeInformation[_]]()
+ private var quoteCharacter: Character = _
+ private var path: String = _
+ private var fieldDelim: String = CsvInputFormat.DEFAULT_FIELD_DELIMITER
+ private var lineDelim: String = CsvInputFormat.DEFAULT_LINE_DELIMITER
+ private var isIgnoreFirstLine: Boolean = false
+ private var commentPrefix: String = _
+ private var lenient: Boolean = false
+
+ /**
+ * Sets the path to the CSV file. Required.
+ *
+ * @param path the path to the CSV file
+ */
+ def path(path: String): Builder = {
+ this.path = path
+ this
+ }
+
+ /**
+ * Sets the field delimiter, "," by default.
+ *
+ * @param delim the field delimiter
+ */
+ def fieldDelimiter(delim: String): Builder = {
+ this.fieldDelim = delim
+ this
+ }
+
+ /**
+ * Sets the line delimiter, "\n" by default.
+ *
+ * @param delim the line delimiter
+ */
+ def lineDelimiter(delim: String): Builder = {
+ this.lineDelim = delim
+ this
+ }
+
+ /**
+ * Adds a field with the field name and the type information. Required.
+ * This method can be called multiple times. The call order of this method defines
+ * also the order of thee fields in a row.
+ *
+ * @param fieldName the field name
+ * @param fieldType the type information of the field
+ */
+ def field(fieldName: String, fieldType: TypeInformation[_]): Builder = {
+ if (schema.contains(fieldName)) {
+ throw new IllegalArgumentException(s"Duplicate field name $fieldName.")
+ }
+ schema += (fieldName -> fieldType)
+ this
+ }
+
+ /**
+ * Sets a quote character for String values, null by default.
+ *
+ * @param quote the quote character
+ */
+ def quoteCharacter(quote: Character): Builder = {
+ this.quoteCharacter = quote
+ this
+ }
+
+ /**
+ * Sets a prefix to indicate comments, null by default.
+ *
+ * @param prefix the prefix to indicate comments
+ */
+ def commentPrefix(prefix: String): Builder = {
+ this.commentPrefix = prefix
+ this
+ }
+
+ /**
+ * Ignore the first line. Not skip the first line by default.
+ */
+ def ignoreFirstLine(): Builder = {
+ this.isIgnoreFirstLine = true
+ this
+ }
+
+ /**
+ * Skip records with parse error instead to fail. Throw an exception by default.
+ */
+ def ignoreParseErrors(): Builder = {
+ this.lenient = true
+ this
+ }
+
+ /**
+ * Apply the current values and constructs a newly-created [[CsvTableSource]].
+ *
+ * @return a newly-created [[CsvTableSource]].
+ */
+ def build(): CsvTableSource = {
+ if (path == null) {
+ throw new IllegalArgumentException("Path must be defined.")
+ }
+ if (schema.isEmpty) {
+ throw new IllegalArgumentException("Fields can not be empty.")
+ }
+ new CsvTableSource(
+ path,
+ schema.keys.toArray,
+ schema.values.toArray,
+ fieldDelim,
+ lineDelim,
+ quoteCharacter,
+ isIgnoreFirstLine,
+ commentPrefix,
+ lenient)
+ }
+
+ }
+
+ /**
+ * Return a new builder that builds a [[CsvTableSource]].
+ *
+ * For example:
+ *
+ * {{{
+ * val source: CsvTableSource = CsvTableSource
+ * .builder()
+ * .path("/path/to/your/file.csv")
+ * .field("myfield", Types.STRING)
+ * .field("myfield2", Types.INT)
+ * .build()
+ * }}}
+ * @return a new builder to build a [[CsvTableSource]]
+ */
+ def builder(): Builder = new Builder
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6c310a76/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceTest.scala
index 609dc91..670e268 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceTest.scala
@@ -18,11 +18,12 @@
package org.apache.flink.table.api.scala.batch
+import org.apache.flink.table.api.Types
import org.apache.flink.table.api.scala._
import org.apache.flink.table.sources.CsvTableSource
import org.apache.flink.table.utils.{CommonTestData, TableTestBase}
import org.apache.flink.table.utils.TableTestUtil._
-import org.junit.Test
+import org.junit.{Assert, Test}
class TableSourceTest extends TableTestBase {
@@ -139,6 +140,59 @@ class TableSourceTest extends TableTestBase {
util.verifyTable(result, expected)
}
+ @Test
+ def testCsvTableSourceBuilder(): Unit = {
+ val source1 = CsvTableSource.builder()
+ .path("/path/to/csv")
+ .field("myfield", Types.STRING)
+ .field("myfield2", Types.INT)
+ .quoteCharacter(';')
+ .fieldDelimiter("#")
+ .lineDelimiter("\r\n")
+ .commentPrefix("%%")
+ .ignoreFirstLine()
+ .ignoreParseErrors()
+ .build()
+
+ val source2 = new CsvTableSource(
+ "/path/to/csv",
+ Array("myfield", "myfield2"),
+ Array(Types.STRING, Types.INT),
+ "#",
+ "\r\n",
+ ';',
+ true,
+ "%%",
+ true)
+
+ Assert.assertEquals(source1, source2)
+ }
+
+ @Test(expected = classOf[IllegalArgumentException])
+ def testCsvTableSourceBuilderWithNullPath(): Unit = {
+ CsvTableSource.builder()
+ .field("myfield", Types.STRING)
+ // should fail, path is not defined
+ .build()
+ }
+
+ @Test(expected = classOf[IllegalArgumentException])
+ def testCsvTableSourceBuilderWithDuplicateFieldName(): Unit = {
+ CsvTableSource.builder()
+ .path("/path/to/csv")
+ .field("myfield", Types.STRING)
+ // should fail, field name must no be duplicate
+ .field("myfield", Types.INT)
+ }
+
+ @Test(expected = classOf[IllegalArgumentException])
+ def testCsvTableSourceBuilderWithEmptyField(): Unit = {
+ CsvTableSource.builder()
+ .path("/path/to/csv")
+ // should fail, field can be empty
+ .build()
+ }
+
def tableSource: (CsvTableSource, String) = {
val csvTable = CommonTestData.getCsvTableSource
val tableName = "csvTable"