You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/02/15 15:15:00 UTC

flink git commit: [FLINK-5714] [table] Use a builder pattern for creating CsvTableSource

Repository: flink
Updated Branches:
  refs/heads/master 04e6758ab -> 6c310a762


[FLINK-5714] [table] Use a builder pattern for creating CsvTableSource

This closes #3273.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6c310a76
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6c310a76
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6c310a76

Branch: refs/heads/master
Commit: 6c310a7628f6da09289dedc465db9294507fb10e
Parents: 04e6758
Author: Jark Wu <wu...@alibaba-inc.com>
Authored: Mon Feb 6 21:57:04 2017 +0800
Committer: twalthr <tw...@apache.org>
Committed: Wed Feb 15 16:07:17 2017 +0100

----------------------------------------------------------------------
 docs/dev/table_api.md                           |  73 ++++---
 .../flink/table/sources/CsvTableSource.scala    | 190 ++++++++++++++++++-
 .../table/api/scala/batch/TableSourceTest.scala |  56 +++++-
 3 files changed, 270 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6c310a76/docs/dev/table_api.md
----------------------------------------------------------------------
diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md
index 0efd258..6e516fc 100644
--- a/docs/dev/table_api.md
+++ b/docs/dev/table_api.md
@@ -253,56 +253,51 @@ Table result = tableEnvironment.ingest("kafka-source");
 
 The `CsvTableSource` is already included in `flink-table` without additional dependecies.
 
-It can be configured with the following properties:
-
- - `path` The path to the CSV file, required.
- - `fieldNames` The names of the table fields, required.
- - `fieldTypes` The types of the table fields, required.
- - `fieldDelim` The field delimiter, `","` by default.
- - `rowDelim` The row delimiter, `"\n"` by default.
- - `quoteCharacter` An optional quote character for String values, `null` by default.
- - `ignoreFirstLine` Flag to ignore the first line, `false` by default.
- - `ignoreComments` An optional prefix to indicate comments, `null` by default.
- - `lenient` Flag to skip records with parse error instead to fail, `false` by default.
+The easiest way to create a `CsvTableSource` is by using the enclosed builder `CsvTableSource.builder()`, the builder has the following methods to configure properties:
+
+ - `path(String path)` Sets the path to the CSV file, required.
+ - `field(String fieldName, TypeInformation<?> fieldType)` Adds a field with the field name and field type information, can be called multiple times, required. The call order of this method defines also the order of the fields in a row.
+ - `fieldDelimiter(String delim)` Sets the field delimiter, `","` by default.
+ - `lineDelimiter(String delim)` Sets the line delimiter, `"\n"` by default.
+ - `quoteCharacter(Character quote)` Sets the quote character for String values, `null` by default.
+ - `commentPrefix(String prefix)` Sets a prefix to indicate comments, `null` by default.
+ - `ignoreFirstLine()` Ignore the first line. Disabled by default.
+ - `ignoreParseErrors()` Skip records with parse error instead to fail. Throwing an exception by default.
 
 You can create the source as follows:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-CsvTableSource csvTableSource = new CsvTableSource(
-    "/path/to/your/file.csv",
-    new String[] { "name", "id", "score", "comments" },
-    new TypeInformation<?>[] {
-      Types.STRING(),
-      Types.INT(),
-      Types.DOUBLE(),
-      Types.STRING()
-    },
-    "#",    // fieldDelim
-    "$",    // rowDelim
-    null,   // quoteCharacter
-    true,   // ignoreFirstLine
-    "%",    // ignoreComments
-    false); // lenient
+CsvTableSource csvTableSource = CsvTableSource
+    .builder()
+    .path("/path/to/your/file.csv")
+    .field("name", Types.STRING())
+    .field("id", Types.INT())
+    .field("score", Types.DOUBLE())
+    .field("comments", Types.STRING())
+    .fieldDelimiter("#")
+    .lineDelimiter("$")
+    .ignoreFirstLine()
+    .ignoreParseErrors()
+    .commentPrefix("%");
 {% endhighlight %}
 </div>
 
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-val csvTableSource = new CsvTableSource(
-    "/path/to/your/file.csv",
-    Array("name", "id", "score", "comments"),
-    Array(
-      Types.STRING,
-      Types.INT,
-      Types.DOUBLE,
-      Types.STRING
-    ),
-    fieldDelim = "#",
-    rowDelim = "$",
-    ignoreFirstLine = true,
-    ignoreComments = "%")
+val csvTableSource = CsvTableSource
+    .builder
+    .path("/path/to/your/file.csv")
+    .field("name", Types.STRING)
+    .field("id", Types.INT)
+    .field("score", Types.DOUBLE)
+    .field("comments", Types.STRING)
+    .fieldDelimiter("#")
+    .lineDelimiter("$")
+    .ignoreFirstLine
+    .ignoreParseErrors
+    .commentPrefix("%")
 {% endhighlight %}
 </div>
 </div>

http://git-wip-us.apache.org/repos/asf/flink/blob/6c310a76/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
index f59a331..8a458ef 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
@@ -29,6 +29,8 @@ import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.table.api.TableException
 
+import scala.collection.mutable
+
 /**
   * A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV files with a
   * (logically) unlimited number of fields.
@@ -44,15 +46,15 @@ import org.apache.flink.table.api.TableException
   * @param lenient Flag to skip records with parse error instead to fail, false by default.
   */
 class CsvTableSource(
-    path: String,
-    fieldNames: Array[String],
-    fieldTypes: Array[TypeInformation[_]],
-    fieldDelim: String = CsvInputFormat.DEFAULT_FIELD_DELIMITER,
-    rowDelim: String = CsvInputFormat.DEFAULT_LINE_DELIMITER,
-    quoteCharacter: Character = null,
-    ignoreFirstLine: Boolean = false,
-    ignoreComments: String = null,
-    lenient: Boolean = false)
+    private val path: String,
+    private val fieldNames: Array[String],
+    private val fieldTypes: Array[TypeInformation[_]],
+    private val fieldDelim: String = CsvInputFormat.DEFAULT_FIELD_DELIMITER,
+    private val rowDelim: String = CsvInputFormat.DEFAULT_LINE_DELIMITER,
+    private val quoteCharacter: Character = null,
+    private val ignoreFirstLine: Boolean = false,
+    private val ignoreComments: String = null,
+    private val lenient: Boolean = false)
   extends BatchTableSource[Row]
   with StreamTableSource[Row]
   with ProjectableTableSource[Row] {
@@ -138,4 +140,174 @@ class CsvTableSource(
 
     inputFormat
   }
+
+  override def equals(other: Any): Boolean = other match {
+    case that: CsvTableSource => returnType == that.returnType &&
+        path == that.path &&
+        fieldDelim == that.fieldDelim &&
+        rowDelim == that.rowDelim &&
+        quoteCharacter == that.quoteCharacter &&
+        ignoreFirstLine == that.ignoreFirstLine &&
+        ignoreComments == that.ignoreComments &&
+        lenient == that.lenient
+    case _ => false
+  }
+
+  override def hashCode(): Int = {
+    returnType.hashCode()
+  }
+}
+
+object CsvTableSource {
+
+  /**
+    * A builder for creating [[CsvTableSource]] instances.
+    *
+    * For example:
+    *
+    * {{{
+    *   val source: CsvTableSource = new CsvTableSource.builder()
+    *     .path("/path/to/your/file.csv")
+    *     .field("myfield", Types.STRING)
+    *     .field("myfield2", Types.INT)
+    *     .build()
+    * }}}
+    *
+    */
+  class Builder {
+
+    private val schema: mutable.LinkedHashMap[String, TypeInformation[_]] =
+      mutable.LinkedHashMap[String, TypeInformation[_]]()
+    private var quoteCharacter: Character = _
+    private var path: String = _
+    private var fieldDelim: String = CsvInputFormat.DEFAULT_FIELD_DELIMITER
+    private var lineDelim: String = CsvInputFormat.DEFAULT_LINE_DELIMITER
+    private var isIgnoreFirstLine: Boolean = false
+    private var commentPrefix: String = _
+    private var lenient: Boolean = false
+
+    /**
+      * Sets the path to the CSV file. Required.
+      *
+      * @param path the path to the CSV file
+      */
+    def path(path: String): Builder = {
+      this.path = path
+      this
+    }
+
+    /**
+      * Sets the field delimiter, "," by default.
+      *
+      * @param delim the field delimiter
+      */
+    def fieldDelimiter(delim: String): Builder = {
+      this.fieldDelim = delim
+      this
+    }
+
+    /**
+      * Sets the line delimiter, "\n" by default.
+      *
+      * @param delim the line delimiter
+      */
+    def lineDelimiter(delim: String): Builder = {
+      this.lineDelim = delim
+      this
+    }
+
+    /**
+      * Adds a field with the field name and the type information. Required.
+      * This method can be called multiple times. The call order of this method defines
+      * also the order of thee fields in a row.
+      *
+      * @param fieldName the field name
+      * @param fieldType the type information of the field
+      */
+    def field(fieldName: String, fieldType: TypeInformation[_]): Builder = {
+      if (schema.contains(fieldName)) {
+        throw new IllegalArgumentException(s"Duplicate field name $fieldName.")
+      }
+      schema += (fieldName -> fieldType)
+      this
+    }
+
+    /**
+      * Sets a quote character for String values, null by default.
+      *
+      * @param quote the quote character
+      */
+    def quoteCharacter(quote: Character): Builder = {
+      this.quoteCharacter = quote
+      this
+    }
+
+    /**
+      * Sets a prefix to indicate comments, null by default.
+      *
+      * @param prefix the prefix to indicate comments
+      */
+    def commentPrefix(prefix: String): Builder = {
+      this.commentPrefix = prefix
+      this
+    }
+
+    /**
+      * Ignore the first line. Not skip the first line by default.
+      */
+    def ignoreFirstLine(): Builder = {
+      this.isIgnoreFirstLine = true
+      this
+    }
+
+    /**
+      * Skip records with parse error instead to fail. Throw an exception by default.
+      */
+    def ignoreParseErrors(): Builder = {
+      this.lenient = true
+      this
+    }
+
+    /**
+      * Apply the current values and constructs a newly-created [[CsvTableSource]].
+      *
+      * @return a newly-created [[CsvTableSource]].
+      */
+    def build(): CsvTableSource = {
+      if (path == null) {
+        throw new IllegalArgumentException("Path must be defined.")
+      }
+      if (schema.isEmpty) {
+        throw new IllegalArgumentException("Fields can not be empty.")
+      }
+      new CsvTableSource(
+        path,
+        schema.keys.toArray,
+        schema.values.toArray,
+        fieldDelim,
+        lineDelim,
+        quoteCharacter,
+        isIgnoreFirstLine,
+        commentPrefix,
+        lenient)
+    }
+
+  }
+
+  /**
+    * Return a new builder that builds a [[CsvTableSource]].
+    *
+    * For example:
+    *
+    * {{{
+    *   val source: CsvTableSource = CsvTableSource
+    *     .builder()
+    *     .path("/path/to/your/file.csv")
+    *     .field("myfield", Types.STRING)
+    *     .field("myfield2", Types.INT)
+    *     .build()
+    * }}}
+    * @return a new builder to build a [[CsvTableSource]]
+    */
+  def builder(): Builder = new Builder
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6c310a76/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceTest.scala
index 609dc91..670e268 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceTest.scala
@@ -18,11 +18,12 @@
 
 package org.apache.flink.table.api.scala.batch
 
+import org.apache.flink.table.api.Types
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.sources.CsvTableSource
 import org.apache.flink.table.utils.{CommonTestData, TableTestBase}
 import org.apache.flink.table.utils.TableTestUtil._
-import org.junit.Test
+import org.junit.{Assert, Test}
 
 class TableSourceTest extends TableTestBase {
 
@@ -139,6 +140,59 @@ class TableSourceTest extends TableTestBase {
     util.verifyTable(result, expected)
   }
 
+  @Test
+  def testCsvTableSourceBuilder(): Unit = {
+    val source1 = CsvTableSource.builder()
+      .path("/path/to/csv")
+      .field("myfield", Types.STRING)
+      .field("myfield2", Types.INT)
+      .quoteCharacter(';')
+      .fieldDelimiter("#")
+      .lineDelimiter("\r\n")
+      .commentPrefix("%%")
+      .ignoreFirstLine()
+      .ignoreParseErrors()
+      .build()
+
+    val source2 = new CsvTableSource(
+      "/path/to/csv",
+      Array("myfield", "myfield2"),
+      Array(Types.STRING, Types.INT),
+      "#",
+      "\r\n",
+      ';',
+      true,
+      "%%",
+      true)
+
+    Assert.assertEquals(source1, source2)
+  }
+
+  @Test(expected = classOf[IllegalArgumentException])
+  def testCsvTableSourceBuilderWithNullPath(): Unit = {
+    CsvTableSource.builder()
+      .field("myfield", Types.STRING)
+      // should fail, path is not defined
+      .build()
+  }
+
+  @Test(expected = classOf[IllegalArgumentException])
+  def testCsvTableSourceBuilderWithDuplicateFieldName(): Unit = {
+    CsvTableSource.builder()
+      .path("/path/to/csv")
+      .field("myfield", Types.STRING)
+      // should fail, field name must no be duplicate
+      .field("myfield", Types.INT)
+  }
+
+  @Test(expected = classOf[IllegalArgumentException])
+  def testCsvTableSourceBuilderWithEmptyField(): Unit = {
+    CsvTableSource.builder()
+      .path("/path/to/csv")
+      // should fail, field can be empty
+      .build()
+  }
+
   def tableSource: (CsvTableSource, String) = {
     val csvTable = CommonTestData.getCsvTableSource
     val tableName = "csvTable"