You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/06/15 09:44:08 UTC

[07/10] flink git commit: [FLINK-6750] [table] [docs] Rework Table Sources & Sinks Page

[FLINK-6750] [table] [docs] Rework Table Sources & Sinks Page

This closes #4094.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d8756553
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d8756553
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d8756553

Branch: refs/heads/master
Commit: d8756553ce490023a017e5927d30c9f178b858d8
Parents: 2324815
Author: twalthr <tw...@apache.org>
Authored: Fri Jun 9 08:25:54 2017 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Jun 15 11:42:19 2017 +0200

----------------------------------------------------------------------
 docs/dev/table/sourceSinks.md                   | 364 +++++++++++++++++--
 docs/dev/table/streaming.md                     |   8 +-
 .../flink/table/sources/TableSource.scala       |   2 +-
 3 files changed, 335 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d8756553/docs/dev/table/sourceSinks.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sourceSinks.md b/docs/dev/table/sourceSinks.md
index 2d07254..266ab3b 100644
--- a/docs/dev/table/sourceSinks.md
+++ b/docs/dev/table/sourceSinks.md
@@ -22,6 +22,12 @@ specific language governing permissions and limitations
 under the License.
 -->
 
+A `TableSource` provides access to data which is stored in external systems (database, key-value store, message queue) or files. After a [TableSource is registered in a TableEnvironment](common.html#register-a-tablesource) it can accessed by [Table API](tableApi.html) or [SQL](sql.html) queries.
+
+A TableSink [emits a Table](common.html#emit-a-table) to an external storage system, such as a database, key-value store, message queue, or file system (in different encodings, e.g., CSV, Parquet, or ORC). 
+
+Have a look at the [common concepts and API](common.html) page for details how to [register a TableSource](common.html#register-a-tablesource) and how to [emit a Table through a TableSink](common.html#emit-a-table).
+
 * This will be replaced by the TOC
 {:toc}
 
@@ -30,8 +36,8 @@ Provided TableSources
 
 **TODO: extend and complete**
 
-Currently, Flink provides the `CsvTableSource` to read CSV files and various `TableSources` to read JSON or Avro objects from Kafka.
-A custom `TableSource` can be defined by implementing the `BatchTableSource` or `StreamTableSource` interface.
+Currently, Flink provides the `CsvTableSource` to read CSV files and a few table sources to read JSON or Avro data from Kafka.
+A custom `TableSource` can be defined by implementing the `BatchTableSource` or `StreamTableSource` interface. See section on [defining a custom TableSource](#define-a-tablesource) for details.
 
 | **Class name** | **Maven dependency** | **Batch?** | **Streaming?** | **Description**
 | `CsvTableSouce` | `flink-table` | Y | Y | A simple source for CSV files.
@@ -94,13 +100,6 @@ By default, a missing JSON field does not fail the source. You can configure thi
 tableSource.setFailOnMissingField(true);
 ```
 
-You can work with the Table as explained in the rest of the Table API guide:
-
-```java
-tableEnvironment.registerTableSource("kafka-source", kafkaTableSource);
-Table result = tableEnvironment.scan("kafka-source");
-```
-
 {% top %}
 
 ### KafkaAvroTableSource
@@ -114,6 +113,7 @@ To use the Kafka Avro source, you have to add the Kafka connector dependency to
   - `flink-connector-kafka-0.10` for Kafka 0.10, respectively.
 
 You can then create the source as follows (example for Kafka 0.8):
+
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
@@ -195,92 +195,386 @@ val csvTableSource = CsvTableSource
 </div>
 </div>
 
-You can work with the Table as explained in the rest of the Table API guide in both stream and batch `TableEnvironment`s:
+{% top %}
+
+Provided TableSinks
+-------------------
+
+**TODO**
+
+{% top %}
+
+Define a TableSource
+--------------------
+
+A `TableSource` is a generic interface to access to data stored in an external system as a table. It produces a `DataSet` or `DataStream` and provides the type information to derive the schema of the generated table. There are different table sources for batch tables and streaming tables.
+
+Schema information consists of a data type, field names, and corresponding indexes of these names in the data type.
+
+The general interface looks as follows:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-tableEnvironment.registerTableSource("mycsv", csvTableSource);
+TableSource<T> {
 
-Table streamTable = streamTableEnvironment.scan("mycsv");
+  public TypeInformation<T> getReturnType();
 
-Table batchTable = batchTableEnvironment.scan("mycsv");
+  public String explainSource();
+}
 {% endhighlight %}
 </div>
 
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-tableEnvironment.registerTableSource("mycsv", csvTableSource)
+TableSource[T] {
+
+  def getReturnType: TypeInformation[T]
 
-val streamTable = streamTableEnvironment.scan("mycsv")
+  def explainSource: String
 
-val batchTable = batchTableEnvironment.scan("mycsv")
+}
 {% endhighlight %}
 </div>
 </div>
 
-{% top %}
+To define a `TableSource` one needs to implement `TableSource#getReturnType`. In this case field names and field indexes are derived from the returned type.
 
-Provided TableSinks
--------------------
+If the `TypeInformation` returned by `getReturnType` does not allow to specify custom field names, it is possible to implement the `DefinedFieldNames` interface in addition.
 
-**TODO**
+### BatchTableSource
 
-{% top %}
+Defines an external `TableSource` to create a batch table and provides access to its data.
 
-Define a TableSource
---------------------
+The interface looks as follows:
 
-### BatchTableSource
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+BatchTableSource<T> extends TableSource<T> {
 
-**TODO**
+  public DataSet<T> getDataSet(ExecutionEnvironment execEnv);
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+BatchTableSource[T] extends TableSource[T] {
+
+  def getDataSet(execEnv: ExecutionEnvironment): DataSet[T]
+}
+{% endhighlight %}
+</div>
+</div>
 
 {% top %}
 
 ### StreamTableSource
-* TimestampAssigner
-* DefinedRowtimeAttribute / DefinedProctimeAttribute
 
-**TODO**
+Defines an external `TableSource` to create a streaming table and provides access to its data.
+
+The interface looks as follows:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+StreamTableSource<T> extends TableSource<T> {
+
+  public DataSet<T> getDataStream(StreamExecutionEnvironment execEnv);
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+StreamTableSource[T] extends TableSource[T] {
+
+  def getDataStream(execEnv: StreamExecutionEnvironment): DataSet[T]
+}
+{% endhighlight %}
+</div>
+</div>
+
+**Note:** If a Table needs to be processed in event-time, the `DataStream` returned by the `getDataStream()` method must carry timestamps and watermarks. Please see the documentation on [timestamp and watermark assignment]({{ site.baseurl }}/dev/event_timestamps_watermarks.html) for details on how to assign timestamps and watermarks.
+
+**Note:** Time-based operations on streaming tables such as windows in both the [Table API](tableApi.html#group-windows) and [SQL](sql.html#group-windows) require explicitly specified time attributes. 
+
+- `DefinedRowtimeAttribute` provides the `getRowtimeAttribute()` method to specify the name of the event-time time attribute.
+- `DefinedProctimeAttribute` provides the `getProctimeAttribute()` method to specify the name of the processing-time time attribute.
+
+Please see the documentation on [time attributes]({{ site.baseurl }}/dev/table/streaming.html#time-attributes) for details.
 
 {% top %}
 
 ### ProjectableTableSource
 
-**TODO**
+The `ProjectableTableSource` interface adds support for projection push-down to a `TableSource`. A `TableSource` extending this interface is able to project the fields of the return table.
+
+The interface looks as follows:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ProjectableTableSource<T> {
+
+  public TableSource<T> projectFields(int[] fields);
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+ProjectableTableSource[T] {
+
+  def TableSource[T] projectFields(fields: Array[Int])
+}
+{% endhighlight %}
+</div>
+</div>
+
+The `projectFields()` is called with an array that holds the indexes of the required fields. The method returns a new `TableSource` object that returns rows with the requested schema.
 
 {% top %}
 
+### NestedFieldsProjectableTableSource
+
+The `NestedFieldsProjectableTableSource` interface adds support for projection push-down to a `TableSource` with nested fields. A `TableSource` extending this interface is able to project the nested fields of the returned table.
+
+The interface looks as follows:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+NestedFieldsProjectableTableSource<T> {
+
+  public TableSource<T> projectNestedFields(int[] fields, String[][] nestedFields);
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+NestedFieldsProjectableTableSource[T] {
+
+  def projectNestedFields(fields: Array[Int], nestedFields: Array[Array[String]]): TableSource[T]
+}
+{% endhighlight %}
+</div>
+</div>
+
 ### FilterableTableSource
 
-**TODO**
+The `FilterableTableSource` interface adds support for filtering push-down to a `TableSource`. A `TableSource` extending this interface is able to filter records before returning.
+
+The interface looks as follows:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+FilterableTableSource<T> {
+
+  public TableSource<T> applyPredicate(List<Expression> predicates);
+
+  public boolean isFilterPushedDown();
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+FilterableTableSource[T] {
+
+  def applyPredicate(predicates: java.util.List[Expression]): TableSource[T]
+
+  def isFilterPushedDown: Boolean
+}
+{% endhighlight %}
+</div>
+</div>
+
+The optimizer pushes predicates down by calling the `applyPredicate()` method. The `TableSource` can evaluate which predicates to evaluate by itself and which to leave for the framework. Predicates which are evaluated by the `TableSource` must be removed from the `List`. All predicates which remain in the `List` after the method call returns are evaluated by the framework. The `applyPredicate()` method returns a new `TableSource` that evaluates all selected predicates.
+
+The `isFilterPushedDown()` method tells the optimizer whether predicates have been pushed down or not.
 
 {% top %}
 
 Define a TableSink
 ------------------
 
+A `TableSink` specifies how to emit a `Table` to an external system or location. The interface is generic such that it can support different storage locations and formats. There are different table sinks for batch tables and streaming tables.
+
+The general interface looks as follows:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+TableSink<T> {
+
+  public TypeInformation<T> getOutputType();
+
+  public String[] getFieldNames();
+
+  public TypeInformation[] getFieldTypes();
+
+  public TableSink<T> configure(String[] fieldNames, TypeInformation[] fieldTypes);
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+TableSink[T] {
+
+  def getOutputType: TypeInformation<T>
+
+  def getFieldNames: Array[String]
+
+  def getFieldTypes: Array[TypeInformation]
+
+  def configure(fieldNames: Array[String], fieldTypes: Array[TypeInformation]): TableSink[T]
+}
+{% endhighlight %}
+</div>
+</div>
+
+The `TableSink#configure` method is called to pass the schema of the Table (field names and types) to emit to the `TableSink`. The method must return a new instance of the TableSink which is configured to emit the provided Table schema.
+
 ### BatchTableSink
 
-**TODO**
+Defines an external `TableSink` to emit a batch table.
+
+The interface looks as follows:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+BatchTableSink<T> extends TableSink<T> {
+
+  public void emitDataSet(DataSet<T> dataSet);
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+BatchTableSink[T] extends TableSink[T] {
+
+  def emitDataSet(dataSet: DataSet[T]): Unit
+}
+{% endhighlight %}
+</div>
+</div>
 
 {% top %}
 
 ### AppendStreamTableSink
 
-**TODO**
+Defines an external `TableSink` to emit a streaming table with only insert changes.
+
+The interface looks as follows:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+AppendStreamTableSink<T> extends TableSink<T> {
+
+  public void emitDataStream(DataStream<T> dataStream);
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+AppendStreamTableSink[T] extends TableSink[T] {
+
+  def emitDataStream(dataStream: DataStream<T>): Unit
+}
+{% endhighlight %}
+</div>
+</div>
+
+If the table is also modified by update or delete changes, a `TableException` will be thrown.
 
 {% top %}
 
 ### RetractStreamTableSink
 
-**TODO**
+Defines an external `TableSink` to emit a streaming table with insert, update, and delete changes.
+
+The interface looks as follows:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+RetractStreamTableSink<T> extends TableSink<Tuple2<Boolean, T>> {
+
+  public TypeInformation<T> getRecordType();
+
+  public void emitDataStream(DataStream<Tuple2<Boolean, T>> dataStream);
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+RetractStreamTableSink[T] extends TableSink[Tuple2[Boolean, T]] {
+
+  def getRecordType: TypeInformation[T]
+
+  def emitDataStream(dataStream: DataStream[Tuple2[Boolean, T]]): Unit
+}
+{% endhighlight %}
+</div>
+</div>
+
+The table will be converted into a stream of accumulate and retraction messages which are encoded as Java `Tuple2`. The first field is a boolean flag to indicate the message type (`true` indicates insert, `false` indicates delete). The second field holds the record of the requested type `T`.
 
 {% top %}
 
-### UpsertStreamTableSInk
+### UpsertStreamTableSink
 
-**TODO**
+Defines an external `TableSink` to emit a streaming table with insert, update, and delete changes.
+
+The interface looks as follows:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+UpsertStreamTableSink<T> extends TableSink<Tuple2<Boolean, T>> {
+
+  public void setKeyFields(String[] keys);
+
+  public void setIsAppendOnly(boolean isAppendOnly);
+
+  public TypeInformation<T> getRecordType();
+
+  public void emitDataStream(DataStream<Tuple2<Boolean, T>> dataStream);
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+UpsertStreamTableSink[T] extends TableSink[Tuple2[Boolean, T]] {
+
+  def setKeyFields(keys: Array[String]): Unit
+
+  def setIsAppendOnly(isAppendOnly: Boolean): Unit
+
+  def getRecordType: TypeInformation[T]
+
+  def emitDataStream(dataStream: DataStream[Tuple2[Boolean, T]]): Unit
+}
+{% endhighlight %}
+</div>
+</div>
+
+The table must be have unique key fields (atomic or composite) or be append-only. If the table does not have a unique key and is not append-only, a `TableException` will be thrown. The unique key of the table is configured by the `UpsertStreamTableSink#setKeyFields()` method.
+
+The table will be converted into a stream of upsert and delete messages which are encoded as a Java `Tuple2`. The first field is a boolean flag to indicate the message type. The second field holds the record of the requested type `T`.
+
+A message with true boolean field is an upsert message for the configured key. A message with false flag is a delete message for the configured key. If the table is append-only, all messages will have a true flag and must be interpreted as insertions.
 
 {% top %}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d8756553/docs/dev/table/streaming.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/streaming.md b/docs/dev/table/streaming.md
index d7d97fa..c7f070b 100644
--- a/docs/dev/table/streaming.md
+++ b/docs/dev/table/streaming.md
@@ -27,10 +27,12 @@ under the License.
 * This will be replaced by the TOC
 {:toc}
 
-Dynamic table
+Dynamic Table
 -------------
 
-**TO BE DONE**
+This section will be reworked soon. Until then, please read the [introductory blog post about Dynamic Tables](http://flink.apache.org/news/2017/04/04/dynamic-tables.html).
+
+**TO BE DONE:**
 
 * Stream -> Table
 * Table -> Stream
@@ -76,7 +78,7 @@ env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) // default
 </div>
 </div>
 
-Time-based operations such as windows in both the [Table API]({{ site.baseurl }}/dev/table/tableApi.html#windows) and [SQL]({{ site.baseurl }}/dev/table/sql.html#group-windows) require information about the notion of time and its origin. Therefore, tables can offer *logical time attributes* for indicating time and accessing corresponding timestamps in table programs.
+Time-based operations such as windows in both the [Table API]({{ site.baseurl }}/dev/table/tableApi.html#group-windows) and [SQL]({{ site.baseurl }}/dev/table/sql.html#group-windows) require information about the notion of time and its origin. Therefore, tables can offer *logical time attributes* for indicating time and accessing corresponding timestamps in table programs.
 
 Time attributes can be part of every table schema. They are defined when creating a table from a `DataStream` or are pre-defined when using a `TableSource`. Once a time attribute has been defined at the beginning, it can be referenced as a field and can used in time-based operations.
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d8756553/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala
index c41582e..d9ebc5a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala
@@ -25,7 +25,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
   * Schema information consists of a data type, field names, and corresponding indices of
   * these names in the data type.
   *
-  * To define a TableSource one need to implement [[TableSource#getReturnType]]. In this case
+  * To define a TableSource one needs to implement [[TableSource#getReturnType]]. In this case
   * field names and field indices are derived from the returned type.
   *
   * In case if custom field names are required one need to additionally implement