You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/07/09 07:48:24 UTC

[flink] branch release-1.11 updated: [FLINK-18502][FLINK-18505][docs] Add the missing 'legacySourceSinks.zh.md' page and synchronize content of 'sourceSinks.zh.md'

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 262bcab  [FLINK-18502][FLINK-18505][docs] Add the missing 'legacySourceSinks.zh.md' page and synchronize content of 'sourceSinks.zh.md'
262bcab is described below

commit 262bcab24e4311c0dc0686162256dab1eecdfd5f
Author: Roc Marshal <64...@users.noreply.github.com>
AuthorDate: Thu Jul 9 15:43:40 2020 +0800

    [FLINK-18502][FLINK-18505][docs] Add the missing 'legacySourceSinks.zh.md' page and synchronize content of 'sourceSinks.zh.md'
    
    This closes #12854
---
 docs/dev/table/legacySourceSinks.md                |   28 +-
 ...egacySourceSinks.md => legacySourceSinks.zh.md} |   28 +-
 docs/dev/table/sourceSinks.md                      |    4 +-
 docs/dev/table/sourceSinks.zh.md                   | 1162 +++++++++-----------
 4 files changed, 573 insertions(+), 649 deletions(-)

diff --git a/docs/dev/table/legacySourceSinks.md b/docs/dev/table/legacySourceSinks.md
index b58eabd..3d0d0a8 100644
--- a/docs/dev/table/legacySourceSinks.md
+++ b/docs/dev/table/legacySourceSinks.md
@@ -20,13 +20,13 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-A `TableSource` provides access to data which is stored in external systems (database, key-value store, message queue) or files. After a [TableSource is registered in a TableEnvironment](common.html#register-a-tablesource) it can be accessed by [Table API](tableApi.html) or [SQL]({{ site.baseurl }}/dev/table/sql/queries.html) queries.
+A `TableSource` provides access to data which is stored in external systems (database, key-value store, message queue) or files. After a [TableSource is registered in a TableEnvironment]({% link dev/table/common.md %}#register-a-tablesource) it can be accessed by [Table API]({% link dev/table/tableApi.md %}) or [SQL]({% link dev/table/sql/queries.md %}) queries.
 
-A `TableSink` [emits a Table](common.html#emit-a-table) to an external storage system, such as a database, key-value store, message queue, or file system (in different encodings, e.g., CSV, Parquet, or ORC).
+A `TableSink` [emits a Table]({% link dev/table/common.md %}#emit-a-table) to an external storage system, such as a database, key-value store, message queue, or file system (in different encodings, e.g., CSV, Parquet, or ORC).
 
-A `TableFactory` allows for separating the declaration of a connection to an external system from the actual implementation. A table factory creates configured instances of table sources and sinks from normalized, string-based properties. The properties can be generated programmatically using a `Descriptor` or via YAML configuration files for the [SQL Client](sqlClient.html).
+A `TableFactory` allows for separating the declaration of a connection to an external system from the actual implementation. A table factory creates configured instances of table sources and sinks from normalized, string-based properties. The properties can be generated programmatically using a `Descriptor` or via YAML configuration files for the [SQL Client]({% link dev/table/sqlClient.md %}).
 
-Have a look at the [common concepts and API](common.html) page for details how to [register a TableSource](common.html#register-a-tablesource) and how to [emit a Table through a TableSink](common.html#emit-a-table). See the [built-in sources, sinks, and formats](connect.html) page for examples how to use factories.
+Have a look at the [common concepts and API]({% link dev/table/common.md %}) page for details how to [register a TableSource]({% link dev/table/common.md %}#register-a-tablesource) and how to [emit a Table through a TableSink]({% link dev/table/common.md %}#emit-a-table). See the [built-in sources, sinks, and formats]({% link dev/table/connect.md %}) page for examples how to use factories.
 
 * This will be replaced by the TOC
 {:toc}
@@ -69,7 +69,7 @@ TableSource[T] {
 </div>
 </div>
 
-* `getTableSchema()`: Returns the schema of the produced table, i.e., the names and types of the fields of the table. The field types are defined using Flink's `DataType` (see [Table API types]({{ site.baseurl }}/dev/table/types.html) and [SQL types]({{ site.baseurl }}/dev/table/sql/index.html#data-types)). Note that the returned `TableSchema` shouldn't contain computed columns to reflect the schema of the physical `TableSource`.
+* `getTableSchema()`: Returns the schema of the produced table, i.e., the names and types of the fields of the table. The field types are defined using Flink's `DataType` (see [Table API types]({% link dev/table/types.md %}) and [SQL types]({% link dev/table/sql/index.md %}#data-types)). Note that the returned `TableSchema` shouldn't contain computed columns to reflect the schema of the physical `TableSource`.
 
 * `getReturnType()`: Returns the physical type of the `DataStream` (`StreamTableSource`) or `DataSet` (`BatchTableSource`) and the records that are produced by the `TableSource`.
 
@@ -103,7 +103,7 @@ BatchTableSource[T] extends TableSource[T] {
 </div>
 </div>
 
-* `getDataSet(execEnv)`: Returns a `DataSet` with the data of the table. The type of the `DataSet` must be identical to the return type defined by the `TableSource.getReturnType()` method. The `DataSet` can by created using a regular [data source]({{ site.baseurl }}/dev/batch/#data-sources) of the DataSet API. Commonly, a `BatchTableSource` is implemented by wrapping a `InputFormat` or [batch connector]({{ site.baseurl }}/dev/batch/connectors.html).
+* `getDataSet(execEnv)`: Returns a `DataSet` with the data of the table. The type of the `DataSet` must be identical to the return type defined by the `TableSource.getReturnType()` method. The `DataSet` can by created using a regular [data source]({% link dev/batch/index.md %}#data-sources) of the DataSet API. Commonly, a `BatchTableSource` is implemented by wrapping a `InputFormat` or [batch connector]({% link dev/batch/connectors.md %}).
 
 {% top %}
 
@@ -131,19 +131,19 @@ StreamTableSource[T] extends TableSource[T] {
 </div>
 </div>
 
-* `getDataStream(execEnv)`: Returns a `DataStream` with the data of the table. The type of the `DataStream` must be identical to the return type defined by the `TableSource.getReturnType()` method. The `DataStream` can by created using a regular [data source]({{ site.baseurl }}/dev/datastream_api.html#data-sources) of the DataStream API. Commonly, a `StreamTableSource` is implemented by wrapping a `SourceFunction` or a [stream connector]({{ site.baseurl }}/dev/connectors/).
+* `getDataStream(execEnv)`: Returns a `DataStream` with the data of the table. The type of the `DataStream` must be identical to the return type defined by the `TableSource.getReturnType()` method. The `DataStream` can by created using a regular [data source]({% link dev/datastream_api.md %}#data-sources) of the DataStream API. Commonly, a `StreamTableSource` is implemented by wrapping a `SourceFunction` or a [stream connector]({% link dev/connectors/index.md %}).
 
 {% top %}
 
 ### Defining a TableSource with Time Attributes
 
-Time-based operations of streaming [Table API](tableApi.html#group-windows) and [SQL]({{ site.baseurl }}/dev/table/sql/queries.html#group-windows) queries, such as windowed aggregations or joins, require explicitly specified [time attributes](streaming/time_attributes.html).
+Time-based operations of streaming [Table API]({% link dev/table/tableApi.md %}#group-windows) and [SQL]({% link dev/table/sql/queries.md %}#group-windows) queries, such as windowed aggregations or joins, require explicitly specified [time attributes]({% link dev/table/streaming/time_attributes.md %}).
 
 A `TableSource` defines a time attribute as a field of type `Types.SQL_TIMESTAMP` in its table schema. In contrast to all regular fields in the schema, a time attribute must not be matched to a physical field in the return type of the table source. Instead, a `TableSource` defines a time attribute by implementing a certain interface.
 
 #### Defining a Processing Time Attribute
 
-[Processing time attributes](streaming/time_attributes.html#processing-time) are commonly used in streaming queries. A processing time attribute returns the current wall-clock time of the operator that accesses it. A `TableSource` defines a processing time attribute by implementing the `DefinedProctimeAttribute` interface. The interface looks as follows:
+[Processing time attributes]({% link dev/table/streaming/time_attributes.md %}#processing-time) are commonly used in streaming queries. A processing time attribute returns the current wall-clock time of the operator that accesses it. A `TableSource` defines a processing time attribute by implementing the `DefinedProctimeAttribute` interface. The interface looks as follows:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -171,7 +171,7 @@ DefinedProctimeAttribute {
 
 #### Defining a Rowtime Attribute
 
-[Rowtime attributes](streaming/time_attributes.html#event-time) are attributes of type `TIMESTAMP` and handled in a unified way in stream and batch queries.
+[Rowtime attributes]({% link dev/table/streaming/time_attributes.md %}#event-time) are attributes of type `TIMESTAMP` and handled in a unified way in stream and batch queries.
 
 A table schema field of type `SQL_TIMESTAMP` can be declared as rowtime attribute by specifying 
 
@@ -332,7 +332,7 @@ FilterableTableSource[T] {
 
 <span class="label label-danger">Attention</span> This is an experimental feature. The interface may be changed in future versions. It's only supported in Blink planner.
 
-The `LookupableTableSource` interface adds support for the table to be accessed via key column(s) in a lookup fashion. This is very useful when used to join with a dimension table to enrich some information. If you want to use the `TableSource` in lookup mode, you should use the source in [temporal table join syntax](streaming/joins.html).
+The `LookupableTableSource` interface adds support for the table to be accessed via key column(s) in a lookup fashion. This is very useful when used to join with a dimension table to enrich some information. If you want to use the `TableSource` in lookup mode, you should use the source in [temporal table join syntax]({% link dev/table/streaming/joins.md %}).
 
 The interface looks as follows:
 
@@ -365,7 +365,7 @@ LookupableTableSource[T] extends TableSource[T] {
 </div>
 
 * `getLookupFunction(lookupkeys)`: Returns a `TableFunction` which used to lookup the matched row(s) via lookup keys. The lookupkeys are the field names of `LookupableTableSource` in the join equal conditions. The eval method parameters of the returned `TableFunction`'s should be in the order which `lookupkeys` defined. It is recommended to define the parameters in varargs (e.g. `eval(Object... lookupkeys)` to match all the cases). The return type of the `TableFunction` must be identical [...]
-* `getAsyncLookupFunction(lookupkeys)`: Optional. Similar to `getLookupFunction`, but the `AsyncLookupFunction` lookups the matched row(s) asynchronously. The underlying of `AsyncLookupFunction` will be called via [Async I/O]({{ site.baseurl }}/dev/stream/operators/asyncio.html). The first argument of the eval method of the returned `AsyncTableFunction` should be defined as `java.util.concurrent.CompletableFuture` to collect results asynchronously (e.g. `eval(CompletableFuture<Collection [...]
+* `getAsyncLookupFunction(lookupkeys)`: Optional. Similar to `getLookupFunction`, but the `AsyncLookupFunction` lookups the matched row(s) asynchronously. The underlying of `AsyncLookupFunction` will be called via [Async I/O]({% link dev/stream/operators/asyncio.md %}). The first argument of the eval method of the returned `AsyncTableFunction` should be defined as `java.util.concurrent.CompletableFuture` to collect results asynchronously (e.g. `eval(CompletableFuture<Collection<String>>  [...]
 * `isAsyncEnabled()`: Returns true if async lookup is enabled. It requires `getAsyncLookupFunction(lookupkeys)` is implemented if `isAsyncEnabled` returns true.
 
 {% top %}
@@ -710,7 +710,7 @@ connector.debug=true
 
 ### Use a TableFactory in the Table & SQL API
 
-For a type-safe, programmatic approach with explanatory Scaladoc/Javadoc, the Table & SQL API offers descriptors in `org.apache.flink.table.descriptors` that translate into string-based properties. See the [built-in descriptors](connect.html) for sources, sinks, and formats as a reference.
+For a type-safe, programmatic approach with explanatory Scaladoc/Javadoc, the Table & SQL API offers descriptors in `org.apache.flink.table.descriptors` that translate into string-based properties. See the [built-in descriptors]({% link dev/table/connect.md %}) for sources, sinks, and formats as a reference.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -812,4 +812,4 @@ st_env\
 
 </div>
 
-{% top %}
\ No newline at end of file
+{% top %}
diff --git a/docs/dev/table/legacySourceSinks.md b/docs/dev/table/legacySourceSinks.zh.md
similarity index 90%
copy from docs/dev/table/legacySourceSinks.md
copy to docs/dev/table/legacySourceSinks.zh.md
index b58eabd..a410594 100644
--- a/docs/dev/table/legacySourceSinks.md
+++ b/docs/dev/table/legacySourceSinks.zh.md
@@ -20,13 +20,13 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-A `TableSource` provides access to data which is stored in external systems (database, key-value store, message queue) or files. After a [TableSource is registered in a TableEnvironment](common.html#register-a-tablesource) it can be accessed by [Table API](tableApi.html) or [SQL]({{ site.baseurl }}/dev/table/sql/queries.html) queries.
+A `TableSource` provides access to data which is stored in external systems (database, key-value store, message queue) or files. After a [TableSource is registered in a TableEnvironment]({% link dev/table/common.zh.md %}#register-a-tablesource) it can be accessed by [Table API]({% link dev/table/tableApi.zh.md %}) or [SQL]({% link dev/table/sql/queries.zh.md %}) queries.
 
-A `TableSink` [emits a Table](common.html#emit-a-table) to an external storage system, such as a database, key-value store, message queue, or file system (in different encodings, e.g., CSV, Parquet, or ORC).
+A `TableSink` [emits a Table]({% link dev/table/common.zh.md %}#emit-a-table) to an external storage system, such as a database, key-value store, message queue, or file system (in different encodings, e.g., CSV, Parquet, or ORC).
 
-A `TableFactory` allows for separating the declaration of a connection to an external system from the actual implementation. A table factory creates configured instances of table sources and sinks from normalized, string-based properties. The properties can be generated programmatically using a `Descriptor` or via YAML configuration files for the [SQL Client](sqlClient.html).
+A `TableFactory` allows for separating the declaration of a connection to an external system from the actual implementation. A table factory creates configured instances of table sources and sinks from normalized, string-based properties. The properties can be generated programmatically using a `Descriptor` or via YAML configuration files for the [SQL Client]({% link dev/table/sqlClient.zh.md %}).
 
-Have a look at the [common concepts and API](common.html) page for details how to [register a TableSource](common.html#register-a-tablesource) and how to [emit a Table through a TableSink](common.html#emit-a-table). See the [built-in sources, sinks, and formats](connect.html) page for examples how to use factories.
+Have a look at the [common concepts and API]({% link dev/table/common.zh.md %}) page for details how to [register a TableSource]({% link dev/table/common.zh.md %}#register-a-tablesource) and how to [emit a Table through a TableSink]({% link dev/table/common.zh.md %}#emit-a-table). See the [built-in sources, sinks, and formats]({% link dev/table/connect.zh.md %}) page for examples how to use factories.
 
 * This will be replaced by the TOC
 {:toc}
@@ -69,7 +69,7 @@ TableSource[T] {
 </div>
 </div>
 
-* `getTableSchema()`: Returns the schema of the produced table, i.e., the names and types of the fields of the table. The field types are defined using Flink's `DataType` (see [Table API types]({{ site.baseurl }}/dev/table/types.html) and [SQL types]({{ site.baseurl }}/dev/table/sql/index.html#data-types)). Note that the returned `TableSchema` shouldn't contain computed columns to reflect the schema of the physical `TableSource`.
+* `getTableSchema()`: Returns the schema of the produced table, i.e., the names and types of the fields of the table. The field types are defined using Flink's `DataType` (see [Table API types]({% link dev/table/types.zh.md %}) and [SQL types]({% link dev/table/sql/index.zh.md %}#data-types)). Note that the returned `TableSchema` shouldn't contain computed columns to reflect the schema of the physical `TableSource`.
 
 * `getReturnType()`: Returns the physical type of the `DataStream` (`StreamTableSource`) or `DataSet` (`BatchTableSource`) and the records that are produced by the `TableSource`.
 
@@ -103,7 +103,7 @@ BatchTableSource[T] extends TableSource[T] {
 </div>
 </div>
 
-* `getDataSet(execEnv)`: Returns a `DataSet` with the data of the table. The type of the `DataSet` must be identical to the return type defined by the `TableSource.getReturnType()` method. The `DataSet` can by created using a regular [data source]({{ site.baseurl }}/dev/batch/#data-sources) of the DataSet API. Commonly, a `BatchTableSource` is implemented by wrapping a `InputFormat` or [batch connector]({{ site.baseurl }}/dev/batch/connectors.html).
+* `getDataSet(execEnv)`: Returns a `DataSet` with the data of the table. The type of the `DataSet` must be identical to the return type defined by the `TableSource.getReturnType()` method. The `DataSet` can by created using a regular [data source]({% link dev/batch/index.zh.md %}#data-sources) of the DataSet API. Commonly, a `BatchTableSource` is implemented by wrapping a `InputFormat` or [batch connector]({% link dev/batch/connectors.zh.md %}).
 
 {% top %}
 
@@ -131,19 +131,19 @@ StreamTableSource[T] extends TableSource[T] {
 </div>
 </div>
 
-* `getDataStream(execEnv)`: Returns a `DataStream` with the data of the table. The type of the `DataStream` must be identical to the return type defined by the `TableSource.getReturnType()` method. The `DataStream` can by created using a regular [data source]({{ site.baseurl }}/dev/datastream_api.html#data-sources) of the DataStream API. Commonly, a `StreamTableSource` is implemented by wrapping a `SourceFunction` or a [stream connector]({{ site.baseurl }}/dev/connectors/).
+* `getDataStream(execEnv)`: Returns a `DataStream` with the data of the table. The type of the `DataStream` must be identical to the return type defined by the `TableSource.getReturnType()` method. The `DataStream` can by created using a regular [data source]({% link dev/datastream_api.zh.md %}#data-sources) of the DataStream API. Commonly, a `StreamTableSource` is implemented by wrapping a `SourceFunction` or a [stream connector]({% link dev/connectors/index.zh.md %}).
 
 {% top %}
 
 ### Defining a TableSource with Time Attributes
 
-Time-based operations of streaming [Table API](tableApi.html#group-windows) and [SQL]({{ site.baseurl }}/dev/table/sql/queries.html#group-windows) queries, such as windowed aggregations or joins, require explicitly specified [time attributes](streaming/time_attributes.html).
+Time-based operations of streaming [Table API]({% link dev/table/tableApi.zh.md %}#group-windows) and [SQL]({% link dev/table/sql/queries.zh.md %}#group-windows) queries, such as windowed aggregations or joins, require explicitly specified [time attributes]({% link dev/table/streaming/time_attributes.zh.md %}).
 
 A `TableSource` defines a time attribute as a field of type `Types.SQL_TIMESTAMP` in its table schema. In contrast to all regular fields in the schema, a time attribute must not be matched to a physical field in the return type of the table source. Instead, a `TableSource` defines a time attribute by implementing a certain interface.
 
 #### Defining a Processing Time Attribute
 
-[Processing time attributes](streaming/time_attributes.html#processing-time) are commonly used in streaming queries. A processing time attribute returns the current wall-clock time of the operator that accesses it. A `TableSource` defines a processing time attribute by implementing the `DefinedProctimeAttribute` interface. The interface looks as follows:
+[Processing time attributes]({% link dev/table/streaming/time_attributes.zh.md %}#processing-time) are commonly used in streaming queries. A processing time attribute returns the current wall-clock time of the operator that accesses it. A `TableSource` defines a processing time attribute by implementing the `DefinedProctimeAttribute` interface. The interface looks as follows:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -171,7 +171,7 @@ DefinedProctimeAttribute {
 
 #### Defining a Rowtime Attribute
 
-[Rowtime attributes](streaming/time_attributes.html#event-time) are attributes of type `TIMESTAMP` and handled in a unified way in stream and batch queries.
+[Rowtime attributes]({% link dev/table/streaming/time_attributes.zh.md %}#event-time) are attributes of type `TIMESTAMP` and handled in a unified way in stream and batch queries.
 
 A table schema field of type `SQL_TIMESTAMP` can be declared as rowtime attribute by specifying 
 
@@ -332,7 +332,7 @@ FilterableTableSource[T] {
 
 <span class="label label-danger">Attention</span> This is an experimental feature. The interface may be changed in future versions. It's only supported in Blink planner.
 
-The `LookupableTableSource` interface adds support for the table to be accessed via key column(s) in a lookup fashion. This is very useful when used to join with a dimension table to enrich some information. If you want to use the `TableSource` in lookup mode, you should use the source in [temporal table join syntax](streaming/joins.html).
+The `LookupableTableSource` interface adds support for the table to be accessed via key column(s) in a lookup fashion. This is very useful when used to join with a dimension table to enrich some information. If you want to use the `TableSource` in lookup mode, you should use the source in [temporal table join syntax]({% link dev/table/streaming/joins.zh.md %}).
 
 The interface looks as follows:
 
@@ -365,7 +365,7 @@ LookupableTableSource[T] extends TableSource[T] {
 </div>
 
 * `getLookupFunction(lookupkeys)`: Returns a `TableFunction` which used to lookup the matched row(s) via lookup keys. The lookupkeys are the field names of `LookupableTableSource` in the join equal conditions. The eval method parameters of the returned `TableFunction`'s should be in the order which `lookupkeys` defined. It is recommended to define the parameters in varargs (e.g. `eval(Object... lookupkeys)` to match all the cases). The return type of the `TableFunction` must be identical [...]
-* `getAsyncLookupFunction(lookupkeys)`: Optional. Similar to `getLookupFunction`, but the `AsyncLookupFunction` lookups the matched row(s) asynchronously. The underlying of `AsyncLookupFunction` will be called via [Async I/O]({{ site.baseurl }}/dev/stream/operators/asyncio.html). The first argument of the eval method of the returned `AsyncTableFunction` should be defined as `java.util.concurrent.CompletableFuture` to collect results asynchronously (e.g. `eval(CompletableFuture<Collection [...]
+* `getAsyncLookupFunction(lookupkeys)`: Optional. Similar to `getLookupFunction`, but the `AsyncLookupFunction` lookups the matched row(s) asynchronously. The underlying of `AsyncLookupFunction` will be called via [Async I/O]({% link dev/stream/operators/asyncio.zh.md %}). The first argument of the eval method of the returned `AsyncTableFunction` should be defined as `java.util.concurrent.CompletableFuture` to collect results asynchronously (e.g. `eval(CompletableFuture<Collection<String [...]
 * `isAsyncEnabled()`: Returns true if async lookup is enabled. It requires `getAsyncLookupFunction(lookupkeys)` is implemented if `isAsyncEnabled` returns true.
 
 {% top %}
@@ -710,7 +710,7 @@ connector.debug=true
 
 ### Use a TableFactory in the Table & SQL API
 
-For a type-safe, programmatic approach with explanatory Scaladoc/Javadoc, the Table & SQL API offers descriptors in `org.apache.flink.table.descriptors` that translate into string-based properties. See the [built-in descriptors](connect.html) for sources, sinks, and formats as a reference.
+For a type-safe, programmatic approach with explanatory Scaladoc/Javadoc, the Table & SQL API offers descriptors in `org.apache.flink.table.descriptors` that translate into string-based properties. See the [built-in descriptors]({% link dev/table/connect.zh.md %}) for sources, sinks, and formats as a reference.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -812,4 +812,4 @@ st_env\
 
 </div>
 
-{% top %}
\ No newline at end of file
+{% top %}
diff --git a/docs/dev/table/sourceSinks.md b/docs/dev/table/sourceSinks.md
index 6b1df8a..0c735b2 100644
--- a/docs/dev/table/sourceSinks.md
+++ b/docs/dev/table/sourceSinks.md
@@ -60,7 +60,7 @@ The filled arrows show how objects are transformed to other objects from one sta
 the translation process.
 
 <div style="text-align: center">
-  <img width="90%" src="{% link fig/table_connectors.svg %}" alt="Translation of table connectors" />
+  <img width="90%" src="{% link /fig/table_connectors.svg %}" alt="Translation of table connectors" />
 </div>
 
 ### Metadata
@@ -621,7 +621,7 @@ public class ChangelogCsvDeserializer implements DeserializationSchema<RowData>
 
   @Override
   public void open(InitializationContext context) {
-    // converters must be opened
+    // converters must be open
     converter.open(Context.create(ChangelogCsvDeserializer.class.getClassLoader()));
   }
 
diff --git a/docs/dev/table/sourceSinks.zh.md b/docs/dev/table/sourceSinks.zh.md
index e9f2c77..43d84c9 100644
--- a/docs/dev/table/sourceSinks.zh.md
+++ b/docs/dev/table/sourceSinks.zh.md
@@ -22,796 +22,720 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-A `TableSource` provides access to data which is stored in external systems (database, key-value store, message queue) or files. After a [TableSource is registered in a TableEnvironment](common.html#register-a-tablesource) it can be accessed by [Table API](tableApi.html) or [SQL]({{ site.baseurl }}/dev/table/sql/queries.html) queries.
+_Dynamic tables_ are the core concept of Flink's Table & SQL API for processing both bounded and unbounded
+data in a unified fashion.
 
-A `TableSink` [emits a Table](common.html#emit-a-table) to an external storage system, such as a database, key-value store, message queue, or file system (in different encodings, e.g., CSV, Parquet, or ORC).
+Because dynamic tables are only a logical concept, Flink does not own the data itself. Instead, the content
+of a dynamic table is stored in external systems (such as databases, key-value stores, message queues) or files.
 
-A `TableFactory` allows for separating the declaration of a connection to an external system from the actual implementation. A table factory creates configured instances of table sources and sinks from normalized, string-based properties. The properties can be generated programmatically using a `Descriptor` or via YAML configuration files for the [SQL Client](sqlClient.html).
+_Dynamic sources_ and _dynamic sinks_ can be used to read and write data from and to an external system. In
+the documentation, sources and sinks are often summarized under the term _connector_.
 
-Have a look at the [common concepts and API](common.html) page for details how to [register a TableSource](common.html#register-a-tablesource) and how to [emit a Table through a TableSink](common.html#emit-a-table). See the [built-in sources, sinks, and formats](connect.html) page for examples how to use factories.
+Flink provides pre-defined connectors for Kafka, Hive, and different file systems. See the [connector section]({% link dev/table/connectors/index.zh.md %})
+for more information about built-in table sources and sinks.
 
-* This will be replaced by the TOC
-{:toc}
+This page focuses on how to develop a custom, user-defined connector.
 
-Define a TableSource
---------------------
+<span class="label label-danger">Attention</span> New table source and table sink interfaces have been
+introduced in Flink 1.11 as part of [FLIP-95](https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces).
+Also the factory interfaces have been reworked. FLIP-95 is not fully implemented yet. Many ability interfaces
+are not supported yet (e.g. for filter or partition push down). If necessary, please also have a look
+at the [old table sources and sinks page]({% link dev/table/legacySourceSinks.zh.md %}). Those interfaces
+are still supported for backwards compatibility.
 
-A `TableSource` is a generic interface that gives Table API and SQL queries access to data stored in an external system. It provides the schema of the table and the records that are mapped to rows with the table's schema. Depending on whether the `TableSource` is used in a streaming or batch query, the records are produced as a `DataSet` or `DataStream`. 
+* This will be replaced by the TOC
+{:toc}
 
-If a `TableSource` is used in a streaming query it must implement the `StreamTableSource` interface, if it is used in a batch query it must implement the `BatchTableSource` interface. A `TableSource` can also implement both interfaces and be used in streaming and batch queries. 
+Overview
+--------
 
-`StreamTableSource` and `BatchTableSource` extend the base interface `TableSource` that defines the following methods:
+In many cases, implementers don't need to create a new connector from scratch but would like to slightly
+modify existing connectors or hook into the existing stack. In other cases, implementers would like to
+create specialized connectors.
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-TableSource<T> {
+This section helps for both kinds of use cases. It explains the general architecture of table connectors
+from pure declaration in the API to runtime code that will be executed on the cluster.
 
-  public TableSchema getTableSchema();
+The filled arrows show how objects are transformed to other objects from one stage to the next stage during
+the translation process.
 
-  public TypeInformation<T> getReturnType();
-
-  public String explainSource();
-}
-{% endhighlight %}
+<div style="text-align: center">
+  <img width="90%" src="{% link /fig/table_connectors.svg %}" alt="Translation of table connectors" />
 </div>
 
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-TableSource[T] {
-
-  def getTableSchema: TableSchema
+### Metadata
 
-  def getReturnType: TypeInformation[T]
+Both Table API and SQL are declarative APIs. This includes the declaration of tables. Thus, executing
+a `CREATE TABLE` statement results in updated metadata in the target catalog.
 
-  def explainSource: String
+For most catalog implementations, physical data in the external system is not modified for such an
+operation. Connector-specific dependencies don't have to be present in the classpath yet. The options declared
+in the `WITH` clause are neither validated nor otherwise interpreted.
 
-}
-{% endhighlight %}
-</div>
-</div>
-
-* `getTableSchema()`: Returns the schema of the produced table, i.e., the names and types of the fields of the table. The field types are defined using Flink's `DataType` (see [Table API types]({{ site.baseurl }}/dev/table/types.html) and [SQL types]({{ site.baseurl }}/dev/table/sql/index.html#data-types)). Note that the returned `TableSchema` shouldn't contain computed columns to reflect the schema of the physical `TableSource`.
+The metadata for dynamic tables (created via DDL or provided by the catalog) is represented as instances
+of `CatalogTable`. A table name will be resolved into a `CatalogTable` internally when necessary.
 
-* `getReturnType()`: Returns the physical type of the `DataStream` (`StreamTableSource`) or `DataSet` (`BatchTableSource`) and the records that are produced by the `TableSource`.
+### Planning
 
-* `explainSource()`: Returns a String that describes the `TableSource`. This method is optional and used for display purposes only.
+When it comes to planning and optimization of the table program, a `CatalogTable` needs to be resolved
+into a `DynamicTableSource` (for reading in a `SELECT` query) and `DynamicTableSink` (for writing in
+an `INSERT INTO` statement).
 
-The `TableSource` interface separates the logical table schema from the physical type of the returned `DataStream` or `DataSet`. As a consequence, all fields of the table schema (`getTableSchema()`) must be mapped to a field with corresponding type of the physical return type (`getReturnType()`). By default, this mapping is done based on field names. For example, a `TableSource` that defines a table schema with two fields `[name: String, size: Integer]` requires a `TypeInformation` with  [...]
+`DynamicTableSourceFactory` and `DynamicTableSinkFactory` provide connector-specific logic for translating
+the metadata of a `CatalogTable` into instances of `DynamicTableSource` and `DynamicTableSink`. In most
+of the cases, a factory's purpose is to validate options (such as `'port' = '5022'` in the example),
+configure encoding/decoding formats (if required), and create a parameterized instance of the table
+connector.
 
-However, some types, such as Tuple or CaseClass types, do support custom field names. If a `TableSource` returns a `DataStream` or `DataSet` of a type with fixed field names, it can implement the `DefinedFieldMapping` interface to map field names from the table schema to field names of the physical return type.
+By default, instances of `DynamicTableSourceFactory` and `DynamicTableSinkFactory` are discovered using
+Java's [Service Provider Interfaces (SPI)](https://docs.oracle.com/javase/tutorial/sound/SPI-intro.html). The
+`connector` option (such as `'connector' = 'custom'` in the example) must correspond to a valid factory
+identifier.
 
-### Defining a BatchTableSource
+Although it might not be apparent in the class naming, `DynamicTableSource` and `DynamicTableSink`
+can also be seen as stateful factories that eventually produce concrete runtime implementation for reading/writing
+the actual data.
 
-The `BatchTableSource` interface extends the `TableSource` interface and defines one additional method:
+The planner uses the source and sink instances to perform connector-specific bidirectional comunication
+until an optimal logical plan could be found. Depending on the optionally declared ability interfaces (e.g.
+`SupportsProjectionPushDown` or `SupportsOverwrite`), the planner might apply changes to an instance and
+thus mutate the produced runtime implementation.
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-BatchTableSource<T> implements TableSource<T> {
-
-  public DataSet<T> getDataSet(ExecutionEnvironment execEnv);
-}
-{% endhighlight %}
-</div>
+### Runtime
 
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-BatchTableSource[T] extends TableSource[T] {
+Once the logical planning is complete, the planner will obtain the _runtime implementation_ from the table
+connector. Runtime logic is implemented in Flink's core connector interfaces such as `InputFormat` or `SourceFunction`.
 
-  def getDataSet(execEnv: ExecutionEnvironment): DataSet[T]
-}
-{% endhighlight %}
-</div>
-</div>
+Those interfaces are grouped by another level of abstraction as subclasses of `ScanRuntimeProvider`,
+`LookupRuntimeProvider`, and `SinkRuntimeProvider`.
 
-* `getDataSet(execEnv)`: Returns a `DataSet` with the data of the table. The type of the `DataSet` must be identical to the return type defined by the `TableSource.getReturnType()` method. The `DataSet` can by created using a regular [data source]({{ site.baseurl }}/dev/batch/#data-sources) of the DataSet API. Commonly, a `BatchTableSource` is implemented by wrapping a `InputFormat` or [batch connector]({{ site.baseurl }}/dev/batch/connectors.html).
+For example, both `OutputFormatProvider` (providing `org.apache.flink.api.common.io.OutputFormat`) and `SinkFunctionProvider` (providing `org.apache.flink.streaming.api.functions.sink.SinkFunction`) are concrete instances of `SinkRuntimeProvider`
+that the planner can handle.
 
 {% top %}
 
-### Defining a StreamTableSource
+Extension Points
+----------------
 
-The `StreamTableSource` interface extends the `TableSource` interface and defines one additional method:
+This section explains the available interfaces for extending Flink's table connectors.
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-StreamTableSource<T> implements TableSource<T> {
+### Dynamic Table Factories
 
-  public DataStream<T> getDataStream(StreamExecutionEnvironment execEnv);
-}
-{% endhighlight %}
-</div>
+Dynamic table factories are used to configure a dynamic table connector for an external storage system from catalog
+and session information.
 
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-StreamTableSource[T] extends TableSource[T] {
+`org.apache.flink.table.factories.DynamicTableSourceFactory` can be implemented to construct a `DynamicTableSource`.
 
-  def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[T]
-}
-{% endhighlight %}
-</div>
-</div>
+`org.apache.flink.table.factories.DynamicTableSinkFactory` can be implemented to construct a `DynamicTableSink`.
 
-* `getDataStream(execEnv)`: Returns a `DataStream` with the data of the table. The type of the `DataStream` must be identical to the return type defined by the `TableSource.getReturnType()` method. The `DataStream` can by created using a regular [data source]({{ site.baseurl }}/dev/datastream_api.html#data-sources) of the DataStream API. Commonly, a `StreamTableSource` is implemented by wrapping a `SourceFunction` or a [stream connector]({{ site.baseurl }}/dev/connectors/).
+By default, the factory is discovered using the value of the `connector` option as the factory identifier
+and Java's Service Provider Interface.
 
-{% top %}
+In JAR files, references to new implementations can be added to the service file:
 
-### Defining a TableSource with Time Attributes
+`META-INF/services/org.apache.flink.table.factories.Factory`
 
-Time-based operations of streaming [Table API](tableApi.html#group-windows) and [SQL]({{ site.baseurl }}/dev/table/sql/queries.html#group-windows) queries, such as windowed aggregations or joins, require explicitly specified [time attributes](streaming/time_attributes.html).
+The framework will check for a single matching factory that is uniquely identified by factory identifier
+and requested base class (e.g. `DynamicTableSourceFactory`).
 
-A `TableSource` defines a time attribute as a field of type `Types.SQL_TIMESTAMP` in its table schema. In contrast to all regular fields in the schema, a time attribute must not be matched to a physical field in the return type of the table source. Instead, a `TableSource` defines a time attribute by implementing a certain interface.
+The factory discovery process can be bypassed by the catalog implementation if necessary. For this, a
+catalog needs to return an instance that implements the requested base class in `org.apache.flink.table.catalog.Catalog#getFactory`.
 
-#### Defining a Processing Time Attribute
+### Dynamic Table Source
 
-[Processing time attributes](streaming/time_attributes.html#processing-time) are commonly used in streaming queries. A processing time attribute returns the current wall-clock time of the operator that accesses it. A `TableSource` defines a processing time attribute by implementing the `DefinedProctimeAttribute` interface. The interface looks as follows:
+By definition, a dynamic table can change over time.
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-DefinedProctimeAttribute {
+When reading a dynamic table, the content can either be considered as:
+- A changelog (finite or infinite) for which all changes are consumed continuously until the changelog
+  is exhausted. This is represented by the `ScanTableSource` interface.
+- A continuously changing or very large external table whose content is usually never read entirely
+  but queried for individual values when necessary. This is represented by the `LookupTableSource`
+  interface.
 
-  public String getProctimeAttribute();
-}
-{% endhighlight %}
-</div>
+A class can implement both of these interfaces at the same time. The planner decides about their usage depending
+on the specified query.
 
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-DefinedProctimeAttribute {
+#### Scan Table Source
 
-  def getProctimeAttribute: String
-}
-{% endhighlight %}
-</div>
-</div>
+A `ScanTableSource` scans all rows from an external storage system during runtime.
 
-* `getProctimeAttribute()`: Returns the name of the processing time attribute. The specified attribute must be defined of type `Types.SQL_TIMESTAMP` in the table schema and can be used in time-based operations. A `DefinedProctimeAttribute` table source can define no processing time attribute by returning `null`.
+The scanned rows don't have to contain only insertions but can also contain updates and deletions. Thus,
+the table source can be used to read a (finite or infinite) changelog. The returned _changelog mode_ indicates
+the set of changes that the planner can expect during runtime.
 
-<span class="label label-danger">Attention</span> Both `StreamTableSource` and `BatchTableSource` can implement `DefinedProctimeAttribute` and define a processing time attribute. In case of a `BatchTableSource` the processing time field is initialized with the current timestamp during the table scan.
+For regular batch scenarios, the source can emit a bounded stream of insert-only rows.
 
-#### Defining a Rowtime Attribute
+For regular streaming scenarios, the source can emit an unbounded stream of insert-only rows.
 
-[Rowtime attributes](streaming/time_attributes.html#event-time) are attributes of type `TIMESTAMP` and handled in a unified way in stream and batch queries.
+For change data capture (CDC) scenarios, the source can emit bounded or unbounded streams with insert,
+update, and delete rows.
 
-A table schema field of type `SQL_TIMESTAMP` can be declared as rowtime attribute by specifying
+A table source can implement further abilitiy interfaces such as `SupportsProjectionPushDown` that might
+mutate an instance during planning. All abilities are listed in the `org.apache.flink.table.connector.source.abilities`
+package and in the documentation of `org.apache.flink.table.connector.source.ScanTableSource`.
 
-* the name of the field,
-* a `TimestampExtractor` that computes the actual value for the attribute (usually from one or more other fields), and
-* a `WatermarkStrategy` that specifies how watermarks are generated for the the rowtime attribute.
+The runtime implementation of a `ScanTableSource` must produce internal data structures. Thus, records
+must be emitted as `org.apache.flink.table.data.RowData`. The framework provides runtime converters such
+that a source can still work on common data structures and perform a conversion at the end.
 
-A `TableSource` defines a rowtime attribute by implementing the `DefinedRowtimeAttributes` interface. The interface looks as follows:
+#### Lookup Table Source
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-DefinedRowtimeAttribute {
-
-  public List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors();
-}
-{% endhighlight %}
-</div>
+A `LookupTableSource` looks up rows of an external storage system by one or more keys during runtime.
 
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-DefinedRowtimeAttributes {
+Compared to `ScanTableSource`, the source does not have to read the entire table and can lazily fetch individual
+values from a (possibly continuously changing) external table when necessary.
 
-  def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor]
-}
-{% endhighlight %}
-</div>
-</div>
+Compared to `ScanTableSource`, a `LookupTableSource` does only support emitting insert-only changes currently.
 
-* `getRowtimeAttributeDescriptors()`: Returns a list of `RowtimeAttributeDescriptor`. A `RowtimeAttributeDescriptor` describes a rowtime attribute with the following properties:
-  * `attributeName`: The name of the rowtime attribute in the table schema. The field must be defined with type `Types.SQL_TIMESTAMP`.
-  * `timestampExtractor`: The timestamp extractor extracts the timestamp from a record with the return type. For example, it can convert a Long field into a timestamp or parse a String-encoded timestamp. Flink comes with a set of built-in `TimestampExtractor` implementation for common use cases. It is also possible to provide a custom implementation.
-  * `watermarkStrategy`: The watermark strategy defines how watermarks are generated for the rowtime attribute. Flink comes with a set of built-in `WatermarkStrategy` implementations for common use cases. It is also possible to provide a custom implementation.
+Further abilities are not supported. See the documentation of `org.apache.flink.table.connector.source.LookupTableSource`
+for more information.
 
-<span class="label label-danger">Attention</span> Although the `getRowtimeAttributeDescriptors()` method returns a list of descriptors, only a single rowtime attribute is support at the moment. We plan to remove this restriction in the future and support tables with more than one rowtime attribute.
+The runtime implementation of a `LookupTableSource` is a `TableFunction` or `AsyncTableFunction`. The function
+will be called with values for the given lookup keys during runtime.
 
-<span class="label label-danger">Attention</span> Both, `StreamTableSource` and `BatchTableSource`, can implement `DefinedRowtimeAttributes` and define a rowtime attribute. In either case, the rowtime field is extracted using the `TimestampExtractor`. Hence, a `TableSource` that implements `StreamTableSource` and `BatchTableSource` and defines a rowtime attribute provides exactly the same data to streaming and batch queries.
+### Dynamic Table Sink
 
-##### Provided Timestamp Extractors
+By definition, a dynamic table can change over time.
 
-Flink provides `TimestampExtractor` implementations for common use cases.
+When writing a dynamic table, the content can always be considered as a changelog (finite or infinite)
+for which all changes are written out continuously until the changelog is exhausted. The returned _changelog mode_
+indicates the set of changes that the sink accepts during runtime.
 
-The following `TimestampExtractor` implementations are currently available:
+For regular batch scenarios, the sink can solely accept insert-only rows and write out bounded streams.
 
-* `ExistingField(fieldName)`: Extracts the value of a rowtime attribute from an existing `LONG`, `SQL_TIMESTAMP`, or timestamp formatted `STRING` field. One example of such a string would be '2018-05-28 12:34:56.000'.
-* `StreamRecordTimestamp()`: Extracts the value of a rowtime attribute from the timestamp of the `DataStream` `StreamRecord`. Note, this `TimestampExtractor` is not available for batch table sources.
+For regular streaming scenarios, the sink can solely accept insert-only rows and can write out unbounded streams.
 
-A custom `TimestampExtractor` can be defined by implementing the corresponding interface.
+For change data capture (CDC) scenarios, the sink can write out bounded or unbounded streams with insert,
+update, and delete rows.
 
-##### Provided Watermark Strategies
+A table sink can implement further abilitiy interfaces such as `SupportsOverwrite` that might mutate an
+instance during planning. All abilities are listed in the `org.apache.flink.table.connector.sink.abilities`
+package and in the documentation of `org.apache.flink.table.connector.sink.DynamicTableSink`.
 
-Flink provides `WatermarkStrategy` implementations for common use cases.
+The runtime implementation of a `DynamicTableSink` must consume internal data structures. Thus, records
+must be accepted as `org.apache.flink.table.data.RowData`. The framework provides runtime converters such
+that a sink can still work on common data structures and perform a conversion at the beginning.
 
-The following `WatermarkStrategy` implementations are currently available:
+### Encoding / Decoding Formats
 
-* `AscendingTimestamps`: A watermark strategy for ascending timestamps. Records with timestamps that are out-of-order will be considered late.
-* `BoundedOutOfOrderTimestamps(delay)`: A watermark strategy for timestamps that are at most out-of-order by the specified delay.
-* `PreserveWatermarks()`: A strategy which indicates the watermarks should be preserved from the underlying `DataStream`.
+Some table connectors accept different formats that encode and decode keys and/or values.
 
-A custom `WatermarkStrategy` can be defined by implementing the corresponding interface.
+Formats work similar to the pattern `DynamicTableSourceFactory -> DynamicTableSource -> ScanRuntimeProvider`,
+where the factory is responsible for translating options and the source is responsible for creating runtime logic.
 
-{% top %}
+Because formats might be located in different modules, they are discovered using Java's Service Provider
+Interface similar to [table factories](#dynamic-table-factories). In order to discover a format factory,
+the dynamic table factory searches for a factory that corresponds to a factory identifier and connector-specific
+base class.
 
-### Defining a TableSource with Projection Push-Down
+For example, the Kafka table source requires a `DeserializationSchema` as runtime interface for a decoding
+format. Therefore, the Kafka table source factory uses the value of the `value.format` option to discover
+a `DeserializationFormatFactory`.
 
-A `TableSource` supports projection push-down by implementing the `ProjectableTableSource` interface. The interface defines a single method:
+The following format factories are currently supported:
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-ProjectableTableSource<T> {
+```
+org.apache.flink.table.factories.DeserializationFormatFactory
+org.apache.flink.table.factories.SerializationFormatFactory
+```
 
-  public TableSource<T> projectFields(int[] fields);
-}
-{% endhighlight %}
-</div>
+The format factory translates the options into an `EncodingFormat` or a `DecodingFormat`. Those interfaces are
+another kind of factory that produce specialized format runtime logic for the given data type.
 
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-ProjectableTableSource[T] {
-
-  def projectFields(fields: Array[Int]): TableSource[T]
-}
-{% endhighlight %}
-</div>
-</div>
-
-* `projectFields(fields)`: Returns a *copy* of the `TableSource` with adjusted physical return type. The `fields` parameter provides the indexes of the fields that must be provided by the `TableSource`. The indexes relate to the `TypeInformation` of the physical return type, *not* to the logical table schema. The copied `TableSource` must adjust its return type and the returned `DataStream` or `DataSet`. The `TableSchema` of the copied `TableSource` must not be changed, i.e, it must be t [...]
-
-<span class="label label-danger">Attention</span> In order for Flink to distinguish a projection push-down table source from its original form, `explainSource` method must be override to include information regarding the projected fields.
-
-The `ProjectableTableSource` adds support to project flat fields. If the `TableSource` defines a table with nested schema, it can implement the `NestedFieldsProjectableTableSource` to extend the projection to nested fields. The `NestedFieldsProjectableTableSource` is defined as follows:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-NestedFieldsProjectableTableSource<T> {
-
-  public TableSource<T> projectNestedFields(int[] fields, String[][] nestedFields);
-}
-{% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-NestedFieldsProjectableTableSource[T] {
-
-  def projectNestedFields(fields: Array[Int], nestedFields: Array[Array[String]]): TableSource[T]
-}
-{% endhighlight %}
-</div>
-</div>
-
-* `projectNestedField(fields, nestedFields)`: Returns a *copy* of the `TableSource` with adjusted physical return type. Fields of the physical return type may be removed or reordered but their type must not be changed. The contract of this method is essentially the same as for the `ProjectableTableSource.projectFields()` method. In addition, the `nestedFields` parameter contains for each field index in the `fields` list, a list of paths to all nested fields that are accessed by the query [...]
-
-<span class="label label-danger">Attention</span> the types of the projected fields must not be changed but unused fields may be set to null or to a default value.
-
-{% top %}
-
-### Defining a TableSource with Filter Push-Down
-
-The `FilterableTableSource` interface adds support for filter push-down to a `TableSource`. A `TableSource` extending this interface is able to filter records such that the returned `DataStream` or `DataSet` returns fewer records.
-
-The interface looks as follows:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-FilterableTableSource<T> {
-
-  public TableSource<T> applyPredicate(List<Expression> predicates);
-
-  public boolean isFilterPushedDown();
-}
-{% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-FilterableTableSource[T] {
-
-  def applyPredicate(predicates: java.util.List[Expression]): TableSource[T]
-
-  def isFilterPushedDown: Boolean
-}
-{% endhighlight %}
-</div>
-</div>
-
-* `applyPredicate(predicates)`: Returns a *copy* of the `TableSource` with added predicates. The `predicates` parameter is a mutable list of conjunctive predicates that are "offered" to the `TableSource`. The `TableSource` accepts to evaluate a predicate by removing it from the list. Predicates that are left in the list will be evaluated by a subsequent filter operator.
-* `isFilterPushedDown()`: Returns true if the `applyPredicate()` method was called before. Hence, `isFilterPushedDown()` must return true for all `TableSource` instances returned from a `applyPredicate()` call.
-
-<span class="label label-danger">Attention</span> In order for Flink to distinguish a filter push-down table source from its original form, `explainSource` method must be override to include information regarding the push-down filters.
-
-{% top %}
-
-### Defining a TableSource for Lookups
-
-<span class="label label-danger">Attention</span> This is an experimental feature. The interface may be changed in future versions. It's only supported in Blink planner.
-
-The `LookupableTableSource` interface adds support for the table to be accessed via key column(s) in a lookup fashion. This is very useful when used to join with a dimension table to enrich some information. If you want to use the `TableSource` in lookup mode, you should use the source in [temporal table join syntax](streaming/joins.html).
-
-The interface looks as follows:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-LookupableTableSource<T> implements TableSource<T> {
-
-  public TableFunction<T> getLookupFunction(String[] lookupkeys);
-
-  public AsyncTableFunction<T> getAsyncLookupFunction(String[] lookupkeys);
-
-  public boolean isAsyncEnabled();
-}
-{% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-LookupableTableSource[T] extends TableSource[T] {
-
-  def getLookupFunction(lookupKeys: Array[String]): TableFunction[T]
-
-  def getAsyncLookupFunction(lookupKeys: Array[String]): AsyncTableFunction[T]
-
-  def isAsyncEnabled: Boolean
-}
-{% endhighlight %}
-</div>
-</div>
-
-* `getLookupFunction(lookupkeys)`: Returns a `TableFunction` which used to lookup the matched row(s) via lookup keys. The lookupkeys are the field names of `LookupableTableSource` in the join equal conditions. The eval method parameters of the returned `TableFunction`'s should be in the order which `lookupkeys` defined. It is recommended to define the parameters in varargs (e.g. `eval(Object... lookupkeys)` to match all the cases). The return type of the `TableFunction` must be identical [...]
-* `getAsyncLookupFunction(lookupkeys)`: Optional. Similar to `getLookupFunction`, but the `AsyncLookupFunction` lookups the matched row(s) asynchronously. The underlying of `AsyncLookupFunction` will be called via [Async I/O]({{ site.baseurl }}/dev/stream/operators/asyncio.html). The first argument of the eval method of the returned `AsyncTableFunction` should be defined as `java.util.concurrent.CompletableFuture` to collect results asynchronously (e.g. `eval(CompletableFuture<Collection [...]
-* `isAsyncEnabled()`: Returns true if async lookup is enabled. It requires `getAsyncLookupFunction(lookupkeys)` is implemented if `isAsyncEnabled` returns true.
+For example, for a Kafka table source factory, the `DeserializationFormatFactory` would return an `EncodingFormat<DeserializationSchema>`
+that can be passed into the Kafka table source.
 
 {% top %}
 
-Define a TableSink
+Full Stack Example
 ------------------
 
-A `TableSink` specifies how to emit a `Table` to an external system or location. The interface is generic such that it can support different storage locations and formats. There are different table sinks for batch tables and streaming tables.
+This section sketches how to implement a scan table source with a decoding format that supports changelog
+semantics. The example illustrates how all of the mentioned components play together. It can serve as
+a reference implementation.
 
-The general interface looks as follows:
+In particular, it shows how to
+- create factories that parse and validate options,
+- implement table connectors,
+- implement and discover custom formats,
+- and use provided utilities such as data structure converters and the `FactoryUtil`.
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-TableSink<T> {
-
-  public TypeInformation<T> getOutputType();
+The table source uses a simple single-threaded `SourceFunction` to open a socket that listens for incoming
+bytes. The raw bytes are decoded into rows by a pluggable format. The format expects a changelog flag
+as the first column.
 
-  public String[] getFieldNames();
+We will use most of the interfaces metioned above to enable the following DDL:
 
-  public TypeInformation[] getFieldTypes();
-
-  public TableSink<T> configure(String[] fieldNames, TypeInformation[] fieldTypes);
-}
+{% highlight sql %}
+CREATE TABLE UserScores (name STRING, score INT)
+WITH (
+  'connector' = 'socket',
+  'hostname' = 'localhost',
+  'port' = '9999',
+  'byte-delimiter' = '10',
+  'format' = 'changelog-csv',
+  'changelog-csv.column-delimiter' = '|'
+);
 {% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-TableSink[T] {
-
-  def getOutputType: TypeInformation<T>
 
-  def getFieldNames: Array[String]
+Because the format supports changelog semantics, we are able to ingest updates during runtime and create
+an updating view that can continuously evaluate changing data:
 
-  def getFieldTypes: Array[TypeInformation]
-
-  def configure(fieldNames: Array[String], fieldTypes: Array[TypeInformation]): TableSink[T]
-}
+{% highlight sql %}
+SELECT name, SUM(score) FROM UserScores GROUP BY name;
 {% endhighlight %}
-</div>
-</div>
-
-The `TableSink#configure` method is called to pass the schema of the Table (field names and types) to emit to the `TableSink`. The method must return a new instance of the TableSink which is configured to emit the provided Table schema. Note that the provided `TableSchema` shouldn't contain computed columns to reflect the schema of the physical `TableSink`.
-
-### BatchTableSink
 
-Defines an external `TableSink` to emit a batch table.
-
-The interface looks as follows:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-BatchTableSink<T> implements TableSink<T> {
-
-  public void emitDataSet(DataSet<T> dataSet);
-}
+Use the following command to ingest data in a terminal:
+{% highlight text %}
+> nc -lk 9999
+INSERT|Alice|12
+INSERT|Bob|5
+DELETE|Alice|12
+INSERT|Alice|18
 {% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-BatchTableSink[T] extends TableSink[T] {
 
-  def emitDataSet(dataSet: DataSet[T]): Unit
-}
-{% endhighlight %}
-</div>
-</div>
+### Factories
 
-{% top %}
+This section illustrates how to translate metadata coming from the catalog to concrete connector instances.
 
-### AppendStreamTableSink
+Both factories have been added to the `META-INF/services` directory.
 
-Defines an external `TableSink` to emit a streaming table with only insert changes.
+**`SocketDynamicTableFactory`**
 
-The interface looks as follows:
+The `SocketDynamicTableFactory` translates the catalog table to a table source. Because the table source
+requires a decoding format, we are discovering the format using the provided `FactoryUtil` for convenience.
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
 {% highlight java %}
-AppendStreamTableSink<T> implements TableSink<T> {
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.types.DataType;
+
+public class SocketDynamicTableFactory implements DynamicTableSourceFactory {
+
+  // define all options statically
+  public static final ConfigOption<String> HOSTNAME = ConfigOptions.key("hostname")
+    .stringType()
+    .noDefaultValue();
+
+  public static final ConfigOption<Integer> PORT = ConfigOptions.key("port")
+    .intType()
+    .noDefaultValue();
+
+  public static final ConfigOption<Integer> BYTE_DELIMITER = ConfigOptions.key("byte-delimiter")
+    .intType()
+    .defaultValue(10); // corresponds to '\n'
 
-  public DataStreamSink<?> consumeDataStream(DataStream<T> dataStream);
-}
-{% endhighlight %}
-</div>
+  @Override
+  public String factoryIdentifier() {
+    return "socket"; // used for matching to `connector = '...'`
+  }
 
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-AppendStreamTableSink[T] extends TableSink[T] {
+  @Override
+  public Set<ConfigOption<?>> requiredOptions() {
+    final Set<ConfigOption<?>> options = new HashSet<>();
+    options.add(HOSTNAME);
+    options.add(PORT);
+    options.add(FactoryUtil.FORMAT); // use pre-defined option for format
+    return options;
+  }
 
-  def consumeDataStream(dataStream: DataStream[T]): DataStreamSink[_]
+  @Override
+  public Set<ConfigOption<?>> optionalOptions() {
+    final Set<ConfigOption<?>> options = new HashSet<>();
+    options.add(BYTE_DELIMITER);
+    return options;
+  }
+
+  @Override
+  public DynamicTableSource createDynamicTableSource(Context context) {
+    // either implement your custom validation logic here ...
+    // or use the provided helper utility
+    final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+
+    // discover a suitable decoding format
+    final DecodingFormat<DeserializationSchema<RowData>> decodingFormat = helper.discoverDecodingFormat(
+      DeserializationFormatFactory.class,
+      FactoryUtil.FORMAT);
+
+    // validate all options
+    helper.validate();
+
+    // get the validated options
+    final ReadableConfig options = helper.getOptions();
+    final String hostname = options.get(HOSTNAME);
+    final int port = options.get(PORT);
+    final byte byteDelimiter = (byte) (int) options.get(BYTE_DELIMITER);
+
+    // derive the produced data type (excluding computed columns) from the catalog table
+    final DataType producedDataType = context.getCatalogTable().getSchema().toPhysicalRowDataType();
+
+    // create and return dynamic table source
+    return new SocketDynamicTableSource(hostname, port, byteDelimiter, decodingFormat, producedDataType);
+  }
 }
 {% endhighlight %}
-</div>
-</div>
-
-If the table is also modified by update or delete changes, a `TableException` will be thrown.
 
-{% top %}
+**`ChangelogCsvFormatFactory`**
 
-### RetractStreamTableSink
+The `ChangelogCsvFormatFactory` translates format-specific options to a format. The `FactoryUtil` in `SocketDynamicTableFactory`
+takes care of adapting the option keys accordingly and handles the prefixing like `changelog-csv.column-delimiter`.
 
-Defines an external `TableSink` to emit a streaming table with insert, update, and delete changes.
+Because this factory implements `DeserializationFormatFactory`, it could also be used for other connectors
+that support deserialization formats such as the Kafka connector.
 
-The interface looks as follows:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
 {% highlight java %}
-RetractStreamTableSink<T> implements TableSink<Tuple2<Boolean, T>> {
-
-  public TypeInformation<T> getRecordType();
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableFactory;
+
+public class ChangelogCsvFormatFactory implements DeserializationFormatFactory {
+
+  // define all options statically
+  public static final ConfigOption<String> COLUMN_DELIMITER = ConfigOptions.key("column-delimiter")
+    .stringType()
+    .defaultValue("|");
 
-  public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, T>> dataStream);
-}
-{% endhighlight %}
-</div>
+  @Override
+  public String factoryIdentifier() {
+    return "changelog-csv";
+  }
 
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-RetractStreamTableSink[T] extends TableSink[Tuple2[Boolean, T]] {
+  @Override
+  public Set<ConfigOption<?>> requiredOptions() {
+    return Collections.emptySet();
+  }
 
-  def getRecordType: TypeInformation[T]
+  @Override
+  public Set<ConfigOption<?>> optionalOptions() {
+    final Set<ConfigOption<?>> options = new HashSet<>();
+    options.add(COLUMN_DELIMITER);
+    return options;
+  }
 
-  def consumeDataStream(dataStream: DataStream[Tuple2[Boolean, T]]): DataStreamSink[_]
+  @Override
+  public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
+      DynamicTableFactory.Context context,
+      ReadableConfig formatOptions) {
+    // get the validated options
+    final String columnDelimiter = formatOptions.get(COLUMN_DELIMITER);
+
+    // create and return the format
+    return new ChangelogCsvFormat(columnDelimiter);
+  }
 }
 {% endhighlight %}
-</div>
-</div>
 
-The table will be converted into a stream of accumulate and retraction messages which are encoded as Java `Tuple2`. The first field is a boolean flag to indicate the message type (`true` indicates insert, `false` indicates delete). The second field holds the record of the requested type `T`.
+### Table Source and Decoding Format
 
-{% top %}
-
-### UpsertStreamTableSink
+This section illustrates how to translate from instances of the planning layer to runtime instances that
+are shipped to the cluster.
 
-Defines an external `TableSink` to emit a streaming table with insert, update, and delete changes.
+**`SocketDynamicTableSource`**
 
-The interface looks as follows:
+The `SocketDynamicTableSource` is used during planning. In our example, we don't implement any of the
+available ability interfaces. Therefore, the main logic can be found in `getScanRuntimeProvider(...)`
+where we instantiate the required `SourceFunction` and its `DeserializationSchema` for runtime. Both
+instances are parameterized to return internal data structures (i.e. `RowData`).
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
 {% highlight java %}
-UpsertStreamTableSink<T> implements TableSink<Tuple2<Boolean, T>> {
-
-  public void setKeyFields(String[] keys);
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+
+public class SocketDynamicTableSource implements ScanTableSource {
+
+  private final String hostname;
+  private final int port;
+  private final byte byteDelimiter;
+  private final DecodingFormat<DeserializationSchema<RowData>> decodingFormat;
+  private final DataType producedDataType;
+
+  public SocketDynamicTableSource(
+      String hostname,
+      int port,
+      byte byteDelimiter,
+      DecodingFormat<DeserializationSchema<RowData>> decodingFormat,
+      DataType producedDataType) {
+    this.hostname = hostname;
+    this.port = port;
+    this.byteDelimiter = byteDelimiter;
+    this.decodingFormat = decodingFormat;
+    this.producedDataType = producedDataType;
+  }
 
-  public void setIsAppendOnly(boolean isAppendOnly);
+  @Override
+  public ChangelogMode getChangelogMode() {
+    // in our example the format decides about the changelog mode
+    // but it could also be the source itself
+    return decodingFormat.getChangelogMode();
+  }
 
-  public TypeInformation<T> getRecordType();
+  @Override
+  public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
 
-  public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, T>> dataStream);
-}
-{% endhighlight %}
-</div>
+    // create runtime classes that are shipped to the cluster
 
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-UpsertStreamTableSink[T] extends TableSink[Tuple2[Boolean, T]] {
+    final DeserializationSchema<RowData> deserializer = decodingFormat.createRuntimeDecoder(
+      runtimeProviderContext,
+      producedDataType);
 
-  def setKeyFields(keys: Array[String]): Unit
+    final SourceFunction<RowData> sourceFunction = new SocketSourceFunction(
+      hostname,
+      port,
+      byteDelimiter,
+      deserializer);
 
-  def setIsAppendOnly(isAppendOnly: Boolean): Unit
+    return SourceFunctionProvider.of(sourceFunction, false);
+  }
 
-  def getRecordType: TypeInformation[T]
+  @Override
+  public DynamicTableSource copy() {
+    return new SocketDynamicTableSource(hostname, port, byteDelimiter, decodingFormat, producedDataType);
+  }
 
-  def consumeDataStream(dataStream: DataStream[Tuple2[Boolean, T]]): DataStreamSink[_]
+  @Override
+  public String asSummaryString() {
+    return "Socket Table Source";
+  }
 }
 {% endhighlight %}
-</div>
-</div>
-
-The table must be have unique key fields (atomic or composite) or be append-only. If the table does not have a unique key and is not append-only, a `TableException` will be thrown. The unique key of the table is configured by the `UpsertStreamTableSink#setKeyFields()` method.
 
-The table will be converted into a stream of upsert and delete messages which are encoded as a Java `Tuple2`. The first field is a boolean flag to indicate the message type. The second field holds the record of the requested type `T`.
+**`ChangelogCsvFormat`**
 
-A message with true boolean field is an upsert message for the configured key. A message with false flag is a delete message for the configured key. If the table is append-only, all messages will have a true flag and must be interpreted as insertions.
+The `ChangelogCsvFormat` is a decoding format that uses a `DeserializationSchema` during runtime. It
+supports emitting `INSERT` and `DELETE` changes.
 
-{% top %}
-
-Define a TableFactory
----------------------
-
-A `TableFactory` allows to create different table-related instances from string-based properties. All available factories are called for matching to the given set of properties and a corresponding factory class.
-
-Factories leverage Java's [Service Provider Interfaces (SPI)](https://docs.oracle.com/javase/tutorial/sound/SPI-intro.html) for discovering. This means that every dependency and JAR file should contain a file `org.apache.flink.table.factories.TableFactory` in the `META_INF/services` resource directory that lists all available table factories that it provides.
-
-Every table factory needs to implement the following interface:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
 {% highlight java %}
-package org.apache.flink.table.factories;
-
-interface TableFactory {
-
-  Map<String, String> requiredContext();
-
-  List<String> supportedProperties();
-}
-{% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-package org.apache.flink.table.factories
-
-trait TableFactory {
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.DynamicTableSource.DataStructureConverter;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.RowKind;
+
+public class ChangelogCsvFormat implements DecodingFormat<DeserializationSchema<RowData>> {
+
+  private final String columnDelimiter;
+
+  public ChangelogCsvFormat(String columnDelimiter) {
+    this.columnDelimiter = columnDelimiter;
+  }
 
-  def requiredContext(): util.Map[String, String]
+  @Override
+  @SuppressWarnings("unchecked")
+  public DeserializationSchema<RowData> createRuntimeDecoder(
+      DynamicTableSource.Context context,
+      DataType producedDataType) {
+    // create type information for the DeserializationSchema
+    final TypeInformation<RowData> producedTypeInfo = (TypeInformation<RowData>) context.createTypeInformation(
+      producedDataType);
+
+    // most of the code in DeserializationSchema will not work on internal data structures
+    // create a converter for conversion at the end
+    final DataStructureConverter converter = context.createDataStructureConverter(producedDataType);
+
+    // use logical types during runtime for parsing
+    final List<LogicalType> parsingTypes = producedDataType.getLogicalType().getChildren();
+
+    // create runtime class
+    return new ChangelogCsvDeserializer(parsingTypes, converter, producedTypeInfo, columnDelimiter);
+  }
 
-  def supportedProperties(): util.List[String]
+  @Override
+  public ChangelogMode getChangelogMode() {
+    // define that this format can produce INSERT and DELETE rows
+    return ChangelogMode.newBuilder()
+      .addContainedKind(RowKind.INSERT)
+      .addContainedKind(RowKind.DELETE)
+      .build();
+  }
 }
 {% endhighlight %}
-</div>
-</div>
-
-* `requiredContext()`: Specifies the context that this factory has been implemented for. The framework guarantees to only match for this factory if the specified set of properties and values are met. Typical properties might be `connector.type`, `format.type`, or `update-mode`. Property keys such as `connector.property-version` and `format.property-version` are reserved for future backwards compatibility cases.
-* `supportedProperties()`: List of property keys that this factory can handle. This method will be used for validation. If a property is passed that this factory cannot handle, an exception will be thrown. The list must not contain the keys that are specified by the context.
 
-In order to create a specific instance, a factory class can implement one or more interfaces provided in `org.apache.flink.table.factories`:
+### Runtime
 
-* `BatchTableSourceFactory`: Creates a batch table source.
-* `BatchTableSinkFactory`: Creates a batch table sink.
-* `StreamTableSourceFactory`: Creates a stream table source.
-* `StreamTableSinkFactory`: Creates a stream table sink.
-* `DeserializationSchemaFactory`: Creates a deserialization schema format.
-* `SerializationSchemaFactory`: Creates a serialization schema format.
+For completeness, this section illustrates the runtime logic for both `SourceFunction` and `DeserializationSchema`.
 
-The discovery of a factory happens in multiple stages:
+**ChangelogCsvDeserializer**
 
-- Discover all available factories.
-- Filter by factory class (e.g., `StreamTableSourceFactory`).
-- Filter by matching context.
-- Filter by supported properties.
-- Verify that exactly one factory matches, otherwise throw an `AmbiguousTableFactoryException` or `NoMatchingTableFactoryException`.
+The `ChangelogCsvDeserializer` contains a simple parsing logic for converting bytes into `Row` of `Integer`
+and `String` with a row kind. The final conversion step converts those into internal data structures.
 
-The following example shows how to provide a custom streaming source with an additional `connector.debug` property flag for parameterization.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
 {% highlight java %}
-import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.connector.RuntimeConverter.Context;
+import org.apache.flink.table.connector.source.DynamicTableSource.DataStructureConverter;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
 import org.apache.flink.types.Row;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-class MySystemTableSourceFactory implements StreamTableSourceFactory<Row> {
-
-  @Override
-  public Map<String, String> requiredContext() {
-    Map<String, String> context = new HashMap<>();
-    context.put("update-mode", "append");
-    context.put("connector.type", "my-system");
-    return context;
+import org.apache.flink.types.RowKind;
+
+public class ChangelogCsvDeserializer implements DeserializationSchema<RowData> {
+
+  private final List<LogicalType> parsingTypes;
+  private final DataStructureConverter converter;
+  private final TypeInformation<RowData> producedTypeInfo;
+  private final String columnDelimiter;
+
+  public ChangelogCsvDeserializer(
+      List<LogicalType> parsingTypes,
+      DataStructureConverter converter,
+      TypeInformation<RowData> producedTypeInfo,
+      String columnDelimiter) {
+    this.parsingTypes = parsingTypes;
+    this.converter = converter;
+    this.producedTypeInfo = producedTypeInfo;
+    this.columnDelimiter = columnDelimiter;
   }
 
   @Override
-  public List<String> supportedProperties() {
-    List<String> list = new ArrayList<>();
-    list.add("connector.debug");
-    return list;
+  public TypeInformation<RowData> getProducedType() {
+    // return the type information required by Flink's core interfaces
+    return producedTypeInfo;
   }
 
   @Override
-  public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {
-    boolean isDebug = Boolean.valueOf(properties.get("connector.debug"));
-
-    # additional validation of the passed properties can also happen here
-
-    return new MySystemAppendTableSource(isDebug);
+  public void open(InitializationContext context) {
+    // converters must be open
+    converter.open(Context.create(ChangelogCsvDeserializer.class.getClassLoader()));
   }
-}
-{% endhighlight %}
-</div>
 
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-import java.util
-import org.apache.flink.table.sources.StreamTableSource
-import org.apache.flink.types.Row
-
-class MySystemTableSourceFactory extends StreamTableSourceFactory[Row] {
-
-  override def requiredContext(): util.Map[String, String] = {
-    val context = new util.HashMap[String, String]()
-    context.put("update-mode", "append")
-    context.put("connector.type", "my-system")
-    context
+  @Override
+  public RowData deserialize(byte[] message) {
+    // parse the columns including a changelog flag
+    final String[] columns = new String(message).split(Pattern.quote(columnDelimiter));
+    final RowKind kind = RowKind.valueOf(columns[0]);
+    final Row row = new Row(kind, parsingTypes.size());
+    for (int i = 0; i < parsingTypes.size(); i++) {
+      row.setField(i, parse(parsingTypes.get(i).getTypeRoot(), columns[i + 1]));
+    }
+    // convert to internal data structure
+    return (RowData) converter.toInternal(row);
   }
 
-  override def supportedProperties(): util.List[String] = {
-    val properties = new util.ArrayList[String]()
-    properties.add("connector.debug")
-    properties
+  private static Object parse(LogicalTypeRoot root, String value) {
+    switch (root) {
+      case INTEGER:
+        return Integer.parseInt(value);
+      case VARCHAR:
+        return value;
+      default:
+        throw new IllegalArgumentException();
+    }
   }
 
-  override def createStreamTableSource(properties: util.Map[String, String]): StreamTableSource[Row] = {
-    val isDebug = java.lang.Boolean.valueOf(properties.get("connector.debug"))
-
-    # additional validation of the passed properties can also happen here
-
-    new MySystemAppendTableSource(isDebug)
+  @Override
+  public boolean isEndOfStream(RowData nextElement) {
+    return false;
   }
 }
 {% endhighlight %}
-</div>
-</div>
-
-{% top %}
-
-### Use a TableFactory in the SQL Client
-
-In a SQL Client environment file, the previously presented factory could be declared as:
-
-{% highlight yaml %}
-tables:
- - name: MySystemTable
-   type: source
-   update-mode: append
-   connector:
-     type: my-system
-     debug: true
-{% endhighlight %}
-
-The YAML file is translated into flattened string properties and a table factory is called with those properties that describe the connection to the external system:
-
-{% highlight text %}
-update-mode=append
-connector.type=my-system
-connector.debug=true
-{% endhighlight %}
-
-<span class="label label-danger">Attention</span> Properties such as `tables.#.name` or `tables.#.type` are SQL Client specifics and are not passed to any factory. The `type` property decides, depending on the execution environment, whether a `BatchTableSourceFactory`/`StreamTableSourceFactory` (for `source`), a `BatchTableSinkFactory`/`StreamTableSinkFactory` (for `sink`), or both (for `both`) need to discovered.
-
-{% top %}
 
-### Use a TableFactory in the Table & SQL API
+**SocketSourceFunction**
 
-For a type-safe, programmatic approach with explanatory Scaladoc/Javadoc, the Table & SQL API offers descriptors in `org.apache.flink.table.descriptors` that translate into string-based properties. See the [built-in descriptors](connect.html) for sources, sinks, and formats as a reference.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-
-A custom descriptor can be defined by extending the `ConnectorDescriptor` class.
+The `SocketSourceFunction` opens a socket and consumes bytes. It splits records by the given byte
+delimiter (`\n` by default) and delegates the decoding to a pluggable `DeserializationSchema`. The
+source function can only work with a parallelism of 1.
 
 {% highlight java %}
-import org.apache.flink.table.descriptors.ConnectorDescriptor;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
-  * Connector to MySystem with debug mode.
-  */
-public class MySystemConnector extends ConnectorDescriptor {
-
-  public final boolean isDebug;
-
-  public MySystemConnector(boolean isDebug) {
-    super("my-system", 1, false);
-    this.isDebug = isDebug;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.table.data.RowData;
+
+public class SocketSourceFunction extends RichSourceFunction<RowData> implements ResultTypeQueryable<RowData> {
+
+  private final String hostname;
+  private final int port;
+  private final byte byteDelimiter;
+  private final DeserializationSchema<RowData> deserializer;
+
+  private volatile boolean isRunning = true;
+  private Socket currentSocket;
+
+  public SocketSourceFunction(String hostname, int port, byte byteDelimiter, DeserializationSchema<RowData> deserializer) {
+    this.hostname = hostname;
+    this.port = port;
+    this.byteDelimiter = byteDelimiter;
+    this.deserializer = deserializer;
   }
 
   @Override
-  protected Map<String, String> toConnectorProperties() {
-    Map<String, String> properties = new HashMap<>();
-    properties.put("connector.debug", Boolean.toString(isDebug));
-    return properties;
+  public TypeInformation<RowData> getProducedType() {
+    return deserializer.getProducedType();
   }
-}
-{% endhighlight %}
-
-The descriptor can then be used to create a table with the table environment.
-
-{% highlight java %}
-StreamTableEnvironment tableEnv = // ...
-
-tableEnv
-  .connect(new MySystemConnector(true))
-  .withSchema(...)
-  .inAppendMode()
-  .createTemporaryTable("MySystemTable");
-{% endhighlight %}
-
-</div>
 
-<div data-lang="scala" markdown="1">
-
-A custom descriptor can be defined by extending the `ConnectorDescriptor` class.
-
-{% highlight scala %}
-import org.apache.flink.table.descriptors.ConnectorDescriptor
-import java.util.HashMap
-import java.util.Map
+  @Override
+  public void open(Configuration parameters) throws Exception {
+    deserializer.open(() -> getRuntimeContext().getMetricGroup());
+  }
 
-/**
-  * Connector to MySystem with debug mode.
-  */
-class MySystemConnector(isDebug: Boolean) extends ConnectorDescriptor("my-system", 1, false) {
+  @Override
+  public void run(SourceContext<RowData> ctx) throws Exception {
+    while (isRunning) {
+      // open and consume from socket
+      try (final Socket socket = new Socket()) {
+        currentSocket = socket;
+        socket.connect(new InetSocketAddress(hostname, port), 0);
+        try (InputStream stream = socket.getInputStream()) {
+          ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+          int b;
+          while ((b = stream.read()) >= 0) {
+            // buffer until delimiter
+            if (b != byteDelimiter) {
+              buffer.write(b);
+            }
+            // decode and emit record
+            else {
+              ctx.collect(deserializer.deserialize(buffer.toByteArray()));
+              buffer.reset();
+            }
+          }
+        }
+      } catch (Throwable t) {
+        t.printStackTrace(); // print and continue
+      }
+      Thread.sleep(1000);
+    }
+  }
 
-  override protected def toConnectorProperties(): Map[String, String] = {
-    val properties = new HashMap[String, String]
-    properties.put("connector.debug", isDebug.toString)
-    properties
+  @Override
+  public void cancel() {
+    isRunning = false;
+    try {
+      currentSocket.close();
+    } catch (Throwable t) {
+      // ignore
+    }
   }
 }
 {% endhighlight %}
 
-The descriptor can then be used to create a table with the table environment.
-
-{% highlight scala %}
-val tableEnv: StreamTableEnvironment = // ...
-
-tableEnv
-  .connect(new MySystemConnector(isDebug = true))
-  .withSchema(...)
-  .inAppendMode()
-  .createTemporaryTable("MySystemTable")
-{% endhighlight %}
-
-</div>
-
-<div data-lang="python" markdown="1">
-
-You can use a Java `TableFactory` from Python using the `CustomConnectorDescriptor`.
-
-{% highlight python %}
-s_env = StreamExecutionEnvironment.get_execution_environment()
-st_env = StreamTableEnvironment.create(s_env)
-
-custom_connector = CustomConnectorDescriptor('my-system', 1, False)
-st_env\
-    .connect(custom_connector.property("connector.debug", "true")) \
-    .with_schema(...) \
-    .in_append_mode()\
-    .create_temporary_table("MySystemTable")
-{% endhighlight %}
-</div>
-
-</div>
-
 {% top %}