You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/06/09 18:37:51 UTC

[flink] branch release-1.11 updated (435ec27 -> 1df350c)

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a change to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 435ec27  [FLINK-16225] Implement user class loading exception handler
     new c6f4de3  [FLINK-10740][docs] Add documentation for FLIP-27 sources
     new 846ae78  [FLINK-10740][docs] Add documentation for FLIP-27 Source API and SplitReader API.
     new 2d75b19  [hotfix][docs] Add NEWLINE to end of SVG files
     new 1df350c  [hotfix][docs] Minor language cleanups for Data Source docs.

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/dev/stream/sources.md        | 391 ++++++++++++++++++++++++++++++++++++++
 docs/dev/stream/sources.zh.md     | 391 ++++++++++++++++++++++++++++++++++++++
 docs/fig/per_split_watermarks.svg |  20 ++
 docs/fig/source_components.svg    |  20 ++
 docs/fig/source_reader.svg        |  20 ++
 5 files changed, 842 insertions(+)
 create mode 100644 docs/dev/stream/sources.md
 create mode 100644 docs/dev/stream/sources.zh.md
 create mode 100644 docs/fig/per_split_watermarks.svg
 create mode 100644 docs/fig/source_components.svg
 create mode 100644 docs/fig/source_reader.svg


[flink] 03/04: [hotfix][docs] Add NEWLINE to end of SVG files

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2d75b19d78ae00e2ac7bc1ae1cab88779760819e
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Tue Jun 9 18:20:09 2020 +0200

    [hotfix][docs] Add NEWLINE to end of SVG files
---
 docs/fig/per_split_watermarks.svg | 2 +-
 docs/fig/source_components.svg    | 2 +-
 docs/fig/source_reader.svg        | 2 +-
 3 files changed, 3 insertions(+), 3 deletions(-)

diff --git a/docs/fig/per_split_watermarks.svg b/docs/fig/per_split_watermarks.svg
index cc5371c..8612584 100644
--- a/docs/fig/per_split_watermarks.svg
+++ b/docs/fig/per_split_watermarks.svg
@@ -17,4 +17,4 @@ KIND, either express or implied.  See the License for the
 specific language governing permissions and limitations
 under the License.
 -->
-<svg width="916" height="303" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" overflow="hidden"><defs><clipPath id="clip0"><path d="M192-10 1108-10 1108 293 192 293Z" fill-rule="evenodd" clip-rule="evenodd"/></clipPath></defs><g clip-path="url(#clip0)" transform="translate(-192 10)"><path d="M490 109C490 43.8304 542.383-9 607-9 671.617-9 724 43.8304 724 109 724 174.17 671.617 227 607 227 542.383 227 490 174.17 490 109Z" fill="#AECBCC" fill-rule="evenodd"/><t [...]
\ No newline at end of file
+<svg width="916" height="303" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" overflow="hidden"><defs><clipPath id="clip0"><path d="M192-10 1108-10 1108 293 192 293Z" fill-rule="evenodd" clip-rule="evenodd"/></clipPath></defs><g clip-path="url(#clip0)" transform="translate(-192 10)"><path d="M490 109C490 43.8304 542.383-9 607-9 671.617-9 724 43.8304 724 109 724 174.17 671.617 227 607 227 542.383 227 490 174.17 490 109Z" fill="#AECBCC" fill-rule="evenodd"/><t [...]
diff --git a/docs/fig/source_components.svg b/docs/fig/source_components.svg
index 417b7ce..8c148fe 100644
--- a/docs/fig/source_components.svg
+++ b/docs/fig/source_components.svg
@@ -17,4 +17,4 @@ KIND, either express or implied.  See the License for the
 specific language governing permissions and limitations
 under the License.
 -->
-<svg width="855" height="487" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" overflow="hidden"><defs><clipPath id="clip0"><path d="M26 15 881 15 881 502 26 502Z" fill-rule="evenodd" clip-rule="evenodd"/></clipPath></defs><g clip-path="url(#clip0)" transform="translate(-26 -15)"><path d="M26 158.936C26 151.239 32.2394 145 39.936 145L326.064 145C333.761 145 340 151.239 340 158.936L340 298.064C340 305.761 333.761 312 326.064 312L39.936 312C32.2394 312 26 305.7 [...]
\ No newline at end of file
+<svg width="855" height="487" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" overflow="hidden"><defs><clipPath id="clip0"><path d="M26 15 881 15 881 502 26 502Z" fill-rule="evenodd" clip-rule="evenodd"/></clipPath></defs><g clip-path="url(#clip0)" transform="translate(-26 -15)"><path d="M26 158.936C26 151.239 32.2394 145 39.936 145L326.064 145C333.761 145 340 151.239 340 158.936L340 298.064C340 305.761 333.761 312 326.064 312L39.936 312C32.2394 312 26 305.7 [...]
diff --git a/docs/fig/source_reader.svg b/docs/fig/source_reader.svg
index 1d4d9ac..1d0f363 100644
--- a/docs/fig/source_reader.svg
+++ b/docs/fig/source_reader.svg
@@ -17,4 +17,4 @@ KIND, either express or implied.  See the License for the
 specific language governing permissions and limitations
 under the License.
 -->
-<svg width="1257" height="653" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" overflow="hidden"><defs><clipPath id="clip0"><path d="M-11 41 1246 41 1246 694-11 694Z" fill-rule="evenodd" clip-rule="evenodd"/></clipPath><clipPath id="clip1"><path d="M1017 309 1130 309 1130 420 1017 420Z" fill-rule="evenodd" clip-rule="evenodd"/></clipPath><clipPath id="clip2"><path d="M1017 309 1130 309 1130 420 1017 420Z" fill-rule="evenodd" clip-rule="evenodd"/></clipPath>< [...]
\ No newline at end of file
+<svg width="1257" height="653" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" overflow="hidden"><defs><clipPath id="clip0"><path d="M-11 41 1246 41 1246 694-11 694Z" fill-rule="evenodd" clip-rule="evenodd"/></clipPath><clipPath id="clip1"><path d="M1017 309 1130 309 1130 420 1017 420Z" fill-rule="evenodd" clip-rule="evenodd"/></clipPath><clipPath id="clip2"><path d="M1017 309 1130 309 1130 420 1017 420Z" fill-rule="evenodd" clip-rule="evenodd"/></clipPath>< [...]


[flink] 04/04: [hotfix][docs] Minor language cleanups for Data Source docs.

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1df350cb6d44cffc83d71a0acf32a1e9ea44c93a
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Tue Jun 9 20:00:45 2020 +0200

    [hotfix][docs] Minor language cleanups for Data Source docs.
---
 docs/dev/stream/sources.md    | 23 ++++++++++++++++-------
 docs/dev/stream/sources.zh.md | 23 ++++++++++++++++-------
 2 files changed, 32 insertions(+), 14 deletions(-)

diff --git a/docs/dev/stream/sources.md b/docs/dev/stream/sources.md
index 040318e..3f20388 100644
--- a/docs/dev/stream/sources.md
+++ b/docs/dev/stream/sources.md
@@ -209,22 +209,30 @@ val stream = env.continuousSource(
 </div>
 </div>
 
+----
+----
+
 ## The Split Reader API
-Core SourceReader API is asynchronous and rather low level, leaving users to handle splits manually.
-In practice, many sources have perform blocking poll() and I/O operations, needing separate threads.
-[SplitReader](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java) is the API for such high level API.
-The `SplitReader` API is designed work together with `SourceReaderBase` class which is a base implementation for the `SourceReader`s. The `SourceReaderBase` takes a split reader supplier and creates fetcher threads with various consumption threading models.
+
+The core SourceReader API is fully asynchronous and requires implementations to manage asynchronous split reading manually.
+However, in practice, most sources use perform blocking operations, like blocking *poll()* calls on clients (for example the `KafkaConsumer`), or blocking I/O operations on distributed file systems (HDFS, S3, ...). To make this compatible with the asynchronous Source API, these blocking (synchronous) operations need to happen in separate threads, which hand over the data to the asynchronous part of the reader.
+
+The [SplitReader](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java) is the high-level API for simple synchronous reading/polling-based source implementations, like file reading, Kafka, etc.
+
+The core is the `SourceReaderBase` class, which takes a `SplitReader` and creates fetcher threads running the SplitReader, supporting different consumption threading models.
 
 ### SplitReader
+
 The `SplitReader` API only has three methods:
   - A blocking fetch method to return a [RecordsWithSplitIds](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsWithSplitIds.java)
   - A non-blocking method to handle split changes.
-  - A non-blocking wake up method to wake up the blocking fetch.
+  - A non-blocking wake up method to wake up the blocking fetch operation.
 
 The `SplitReader` only focuses on reading the records from the external system, therefore is much simpler compared with `SourceReader`.
 Please check the Java doc of the class for more details.
 
 ### SourceReaderBase
+
 It is quite common that a `SourceReader` implementation does the following:
 
   - Have a pool of threads fetching from splits of the external system in a blocking way.
@@ -236,6 +244,7 @@ In order to reduce the work of writing a new `SourceReader`, Flink provides a [S
 `SourceReaderBase` has all the above work done out of the box. To write a new `SourceReader`, one can just let the `SourceReader` implementation inherit from the `SourceReaderBase`, fill in a few methods and implement a high level [SplitReader](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java).
 
 ### SplitFetcherManager
+
 The `SourceReaderBase` supports a few threading models out of the box, depending on the behavior of the [SplitFetcherManager](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java) it works with.
 The `SplitFetcherManager` helps create and maintain a pool of `SplitFetcher`s each fetching with a `SplitReader`. It also determines how to assign splits to each split fetcher.
 
@@ -349,7 +358,7 @@ environment.continuousSource(
     String sourceName)
 {% endhighlight %}
 
-The `TimestampAssigner` and `WatermarkGenerator` run transparently as part of the `ReaderOutput`(or `SourceOutput`) so source implementors do not have to implement and timestamp extraction and watermark generation code.
+The `TimestampAssigner` and `WatermarkGenerator` run transparently as part of the `ReaderOutput`(or `SourceOutput`) so source implementors do not have to implement any timestamp extraction and watermark generation code.
 
 #### Event Timestamps
 
@@ -379,4 +388,4 @@ The data source API supports running watermark generators individually *per spli
 
 When implementing a source connector using the *Split Reader API*, this is automatically handled. All implementations based on the Split Reader API have split-aware watermarks out-of-the-box.
 
-For an implementation of the lower level `SourceReader` API to use split-aware watermark generation, the implementation must ouput events from different splits to different outputs, the *split-local SourceOutputs*. Split-local outputs can be created and released on the main [ReaderOutput](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderOutput.java) via the `createOutputForSplit(splitId)` and `releaseOutputForSplit(splitId)` [...]
+For an implementation of the lower level `SourceReader` API to use split-aware watermark generation, the implementation must ouput events from different splits to different outputs: the *Split-local SourceOutputs*. Split-local outputs can be created and released on the main [ReaderOutput](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderOutput.java) via the `createOutputForSplit(splitId)` and `releaseOutputForSplit(splitId)` [...]
diff --git a/docs/dev/stream/sources.zh.md b/docs/dev/stream/sources.zh.md
index 040318e..3f20388 100644
--- a/docs/dev/stream/sources.zh.md
+++ b/docs/dev/stream/sources.zh.md
@@ -209,22 +209,30 @@ val stream = env.continuousSource(
 </div>
 </div>
 
+----
+----
+
 ## The Split Reader API
-Core SourceReader API is asynchronous and rather low level, leaving users to handle splits manually.
-In practice, many sources have perform blocking poll() and I/O operations, needing separate threads.
-[SplitReader](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java) is the API for such high level API.
-The `SplitReader` API is designed work together with `SourceReaderBase` class which is a base implementation for the `SourceReader`s. The `SourceReaderBase` takes a split reader supplier and creates fetcher threads with various consumption threading models.
+
+The core SourceReader API is fully asynchronous and requires implementations to manage asynchronous split reading manually.
+However, in practice, most sources use perform blocking operations, like blocking *poll()* calls on clients (for example the `KafkaConsumer`), or blocking I/O operations on distributed file systems (HDFS, S3, ...). To make this compatible with the asynchronous Source API, these blocking (synchronous) operations need to happen in separate threads, which hand over the data to the asynchronous part of the reader.
+
+The [SplitReader](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java) is the high-level API for simple synchronous reading/polling-based source implementations, like file reading, Kafka, etc.
+
+The core is the `SourceReaderBase` class, which takes a `SplitReader` and creates fetcher threads running the SplitReader, supporting different consumption threading models.
 
 ### SplitReader
+
 The `SplitReader` API only has three methods:
   - A blocking fetch method to return a [RecordsWithSplitIds](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsWithSplitIds.java)
   - A non-blocking method to handle split changes.
-  - A non-blocking wake up method to wake up the blocking fetch.
+  - A non-blocking wake up method to wake up the blocking fetch operation.
 
 The `SplitReader` only focuses on reading the records from the external system, therefore is much simpler compared with `SourceReader`.
 Please check the Java doc of the class for more details.
 
 ### SourceReaderBase
+
 It is quite common that a `SourceReader` implementation does the following:
 
   - Have a pool of threads fetching from splits of the external system in a blocking way.
@@ -236,6 +244,7 @@ In order to reduce the work of writing a new `SourceReader`, Flink provides a [S
 `SourceReaderBase` has all the above work done out of the box. To write a new `SourceReader`, one can just let the `SourceReader` implementation inherit from the `SourceReaderBase`, fill in a few methods and implement a high level [SplitReader](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java).
 
 ### SplitFetcherManager
+
 The `SourceReaderBase` supports a few threading models out of the box, depending on the behavior of the [SplitFetcherManager](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java) it works with.
 The `SplitFetcherManager` helps create and maintain a pool of `SplitFetcher`s each fetching with a `SplitReader`. It also determines how to assign splits to each split fetcher.
 
@@ -349,7 +358,7 @@ environment.continuousSource(
     String sourceName)
 {% endhighlight %}
 
-The `TimestampAssigner` and `WatermarkGenerator` run transparently as part of the `ReaderOutput`(or `SourceOutput`) so source implementors do not have to implement and timestamp extraction and watermark generation code.
+The `TimestampAssigner` and `WatermarkGenerator` run transparently as part of the `ReaderOutput`(or `SourceOutput`) so source implementors do not have to implement any timestamp extraction and watermark generation code.
 
 #### Event Timestamps
 
@@ -379,4 +388,4 @@ The data source API supports running watermark generators individually *per spli
 
 When implementing a source connector using the *Split Reader API*, this is automatically handled. All implementations based on the Split Reader API have split-aware watermarks out-of-the-box.
 
-For an implementation of the lower level `SourceReader` API to use split-aware watermark generation, the implementation must ouput events from different splits to different outputs, the *split-local SourceOutputs*. Split-local outputs can be created and released on the main [ReaderOutput](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderOutput.java) via the `createOutputForSplit(splitId)` and `releaseOutputForSplit(splitId)` [...]
+For an implementation of the lower level `SourceReader` API to use split-aware watermark generation, the implementation must ouput events from different splits to different outputs: the *Split-local SourceOutputs*. Split-local outputs can be created and released on the main [ReaderOutput](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderOutput.java) via the `createOutputForSplit(splitId)` and `releaseOutputForSplit(splitId)` [...]


[flink] 01/04: [FLINK-10740][docs] Add documentation for FLIP-27 sources

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c6f4de38aa9cf3e02bd0d0f8955215687fad8940
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Thu Jun 4 13:41:06 2020 +0200

    [FLINK-10740][docs] Add documentation for FLIP-27 sources
    
    This closes #12492
---
 docs/dev/stream/sources.md        | 168 ++++++++++++++++++++++++++++++++++++++
 docs/dev/stream/sources.zh.md     | 168 ++++++++++++++++++++++++++++++++++++++
 docs/fig/per_split_watermarks.svg |  20 +++++
 docs/fig/source_components.svg    |  20 +++++
 4 files changed, 376 insertions(+)

diff --git a/docs/dev/stream/sources.md b/docs/dev/stream/sources.md
new file mode 100644
index 0000000..0257661
--- /dev/null
+++ b/docs/dev/stream/sources.md
@@ -0,0 +1,168 @@
+---
+title: "Data Sources"
+nav-title: "Data Sources"
+nav-parent_id: streaming
+nav-pos: 10
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+
+* This will be replaced by the TOC
+{:toc}
+
+<div class="alert alert-warning">
+  <p><strong>Note:</strong> This describes the new Data Source API, introduced in Flink 1.11 as part of <a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface">FLIP-27</a>.
+  This new API is currently in <strong>BETA</strong> status.</p>
+  <p>Most of the existing source connectors are not yet (as of Flink 1.11) implemented using this new API,
+  but using the previous API, based on <a href="https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java">SourceFunction</a>.</p>
+</div>
+
+
+This page describes Flink's Data Source API and the concepts and architecture behind it.
+**Read this, if you are interested in how data sources in Flink work, or if you want to implement a new Data Source.**
+
+If you are looking for pre-defined source connectors, please check the [Connector Docs]({{ site.baseurl }}/dev/connectors/).
+
+
+## Data Source Concepts
+
+**Core Components**
+
+A Data Source has three core components: *Splits*, the *SplitEnumerator*, and the *SourceReader*.
+
+  - A **Split** is a portion of data consumed by the source, like a file or a log partition. Splits are granularity by which the source distributes the work and parallelizes the data reading.
+
+  - The **SourceReader** requests *Splits* and processes them, for example by reading the file or log partition represented by the *Split*. The *SourceReader* run in parallel on the Task Managers in the `SourceOperators` and produces the parallel stream of events/records.
+
+  - The **SplitEnumerator** generates the *Splits* and assignes them to the *SourceReaders*. It runs as a single instance on the Job Manager and is responsible for maintaining the backlog of pending *Splits* and assigning them to the readers in a balanced manner.
+  
+The [Source]() class is API entry point that ties the above three components together.
+
+<div style="text-align: center">
+  <img width="70%" src="{{ site.baseurl }}/fig/source_components.svg" alt="Illustration of SplitEnumerator and SourceReader interacting." />
+</div>
+
+
+**Unified Across Streaming and Batch**
+
+The Data Source API supports both unbounded streaming sources and bounded batch sources, in a unified way.
+
+The difference between both cases is minimal: In the bounded/batch case, the enumerator generates a fix set of splits, and each split is necessarily finite. In the unbounded streaming case, one of the two is not true (splits are not finite, or the enumerator keep generating new splits).
+
+#### Examples
+
+Here are some simplified conceptual examples to illustrate how the data source components interact, in streaming and batch cases.
+
+*Note that this does not the accurately describe how the Kafka and File source implementations work; parts are simplified, for illustrative purposes.*
+
+**Bounded File Source**
+
+The source has the URI/Path of a directory to read, and a *Format* that defines how to parse the files.
+
+  - A *Split* is a file, or a region of a file (if the data format supports splitting the file).
+  - The *SplitEnumerator* lists all files under the given directory path. It assigns Splits to the next reader that requests a Split. Once all Splits are assigned, it responds to requests with *NoMoreSplits*.
+  - The *SourceReader* requests a Split and reads the assigned Split (file or file region) and parses it using the given Format. If it does not get another Split, but a *NoMoreSplits* message, it finishes.
+
+**Unbounded Streaming File Source**
+
+This source works the same way as described above, except that the *SplitEnumerator* never responds with *NoMoreSplits* and periodically lists the contents under the given URI/Path to check for new files. Once it finds new files, it generates new Splits for them and can assign them to the available SourceReaders.
+
+**Unbounded Streaming Kafka Source**
+
+The source has a Kafka Topic (or list of Topics or Topic regex) and a *Deserializer* to parse the records.
+
+  - A *Split* is a Kafka Topic Partition.
+  - The *SplitEnumerator* connects to the brokers to list all topic partitions involved in the subscribed topics. The enumerator can optionally repeat this operation to discover newly added topics/partitions.
+  - The *SourceReader* reads the assigned Splits (Topic Partitions) using the KafkaConsumer and deserializes the records using the provided Deserializer. The splits (Topic Partitions) do not have an end, so the reader never reaches the end of the data.
+
+**Bounded Kafka Source**
+
+Same as above, except that each Split (Topic Partition) has a defined end offset. Once the *SourceReader* reaches the end offset for a Split, it finishes that Split. Once all assigned Splits are finished, the SourceReader finishes.
+
+----
+----
+
+## The Data Source API
+
+Source, SourceEnumerator, SourceReader.
+
+env.continuousSource(...).
+
+----
+----
+
+## The Split Reader API
+
+
+core SourceReader API is asynchronous and rather low level, leavers users to handle splits manually.
+in practice, many sources have perform blocking poll() and I/O operations, needing separate threads.
+Split Reader is base implementation for that.
+
+----
+----
+
+## Event Time and Watermarks
+
+*Event Time* assignment and *Watermark Generation* happen as part of the data sources. The event streams leaving the Source Readers have event timestamps and (during streaming execution) contain watermarks. See [Timely Stream Processing]({{ site.baseurl }}/concepts/timely-stream-processing.html) for an introduction to Event Time and Watermarks.
+
+<span class="label label-danger">Important</span> Applications based on the legacy [SourceFunction](https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java) typically generate timestamps and watermarks in a separate later step via `stream.assignTimestampsAndWatermarks(WatermarkStrategy)`. This function should not be used with the new sources, because timestamps will be already assigned, and it will [...]
+
+#### API
+
+The `WatermarkStrategy` is passed to the Source during creation in the DataStream API and creates both the [TimestampAssigner](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/TimestampAssigner.java) and [WatermarkGenerator](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkGenerator.java).
+
+{% highlight java %}
+environment.continuousSource(
+    Source<OUT, ?, ?> source,
+    WatermarkStrategy<OUT> timestampsAndWatermarks,
+    String sourceName)
+{% endhighlight %}
+
+The `TimestampAssigner` and `WatermarkGenerator` run transparently as part of the `ReaderOutput`(or `SourceOutput`) so source implementors do not have to implement and timestamp extraction and watermark generation code.
+
+#### Event Timestamps
+
+Event timestamps are assigned in two steps:
+
+  1. The SourceReader may attach the *source record timestamp* to the event, by calling `SourceOutput.collect(event, timestamp)`.
+     This is relevant only for data sources that are record-based and have timestamps, such as Kafka, Kinesis, Pulsar, or Pravega.
+     Sources that are not based on records with timestamps (like files) do not have a *source record timestamp*.
+     This step is part of the source connector implementation and not parameterized by the application that uses the source.
+
+  2. The `TimestampAssigner`, which is configured by the application, assigns the final timestamp.
+     The `TimestampAssigner` sees the original *source record timestamp* and the event. The assigner can use the *source record timestamp* or access a field of the event obtain the final event timestamp.
+  
+This two-step approach allows users to reference both timestamps from the source systems and timestamps in the event's data as the event timestamp.
+
+*Note:* When using a data source without *source record timestamps* (like files) and selecting the *source record timestamp* as the final event timestamp, events will get a default timestamp equal to `LONG_MIN` *(=-9,223,372,036,854,775,808)*.
+
+#### Watermark Generation
+
+Watermark Generators are only active during streaming execution. Batch execution deactivates Watermark Generators; all related operations described below become effectively no-ops.
+
+The data source API supports running watermark generators individually *per split*. That allows Flink to observe the event time progress per split individually, which is important to handle *event time skew* properly and prevent *idle partitions* from holding back the event time progress of the entire application.
+
+<div style="text-align: center">
+  <img width="80%" src="{{ site.baseurl }}/fig/per_split_watermarks.svg" alt="Watermark Generation in a Source with two Splits." />
+</div>
+
+When implementing a source connector using the *Split Reader API*, this is automatically handled. All implementations based on the Split Reader API have split-aware watermarks out-of-the-box.
+
+For an implementation of the lower level `SourceReader` API to use split-aware watermark generation, the implementation must ouput events from different splits to different outputs, the *split-local SourceOutputs*. Split-local outputs can be created and released on the main [ReaderOutput](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderOutput.java) via the `createOutputForSplit(splitId)` and `releaseOutputForSplit(splitId)` [...]
diff --git a/docs/dev/stream/sources.zh.md b/docs/dev/stream/sources.zh.md
new file mode 100644
index 0000000..0257661
--- /dev/null
+++ b/docs/dev/stream/sources.zh.md
@@ -0,0 +1,168 @@
+---
+title: "Data Sources"
+nav-title: "Data Sources"
+nav-parent_id: streaming
+nav-pos: 10
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+
+* This will be replaced by the TOC
+{:toc}
+
+<div class="alert alert-warning">
+  <p><strong>Note:</strong> This describes the new Data Source API, introduced in Flink 1.11 as part of <a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface">FLIP-27</a>.
+  This new API is currently in <strong>BETA</strong> status.</p>
+  <p>Most of the existing source connectors are not yet (as of Flink 1.11) implemented using this new API,
+  but using the previous API, based on <a href="https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java">SourceFunction</a>.</p>
+</div>
+
+
+This page describes Flink's Data Source API and the concepts and architecture behind it.
+**Read this, if you are interested in how data sources in Flink work, or if you want to implement a new Data Source.**
+
+If you are looking for pre-defined source connectors, please check the [Connector Docs]({{ site.baseurl }}/dev/connectors/).
+
+
+## Data Source Concepts
+
+**Core Components**
+
+A Data Source has three core components: *Splits*, the *SplitEnumerator*, and the *SourceReader*.
+
+  - A **Split** is a portion of data consumed by the source, like a file or a log partition. Splits are granularity by which the source distributes the work and parallelizes the data reading.
+
+  - The **SourceReader** requests *Splits* and processes them, for example by reading the file or log partition represented by the *Split*. The *SourceReader* run in parallel on the Task Managers in the `SourceOperators` and produces the parallel stream of events/records.
+
+  - The **SplitEnumerator** generates the *Splits* and assignes them to the *SourceReaders*. It runs as a single instance on the Job Manager and is responsible for maintaining the backlog of pending *Splits* and assigning them to the readers in a balanced manner.
+  
+The [Source]() class is API entry point that ties the above three components together.
+
+<div style="text-align: center">
+  <img width="70%" src="{{ site.baseurl }}/fig/source_components.svg" alt="Illustration of SplitEnumerator and SourceReader interacting." />
+</div>
+
+
+**Unified Across Streaming and Batch**
+
+The Data Source API supports both unbounded streaming sources and bounded batch sources, in a unified way.
+
+The difference between both cases is minimal: In the bounded/batch case, the enumerator generates a fix set of splits, and each split is necessarily finite. In the unbounded streaming case, one of the two is not true (splits are not finite, or the enumerator keep generating new splits).
+
+#### Examples
+
+Here are some simplified conceptual examples to illustrate how the data source components interact, in streaming and batch cases.
+
+*Note that this does not the accurately describe how the Kafka and File source implementations work; parts are simplified, for illustrative purposes.*
+
+**Bounded File Source**
+
+The source has the URI/Path of a directory to read, and a *Format* that defines how to parse the files.
+
+  - A *Split* is a file, or a region of a file (if the data format supports splitting the file).
+  - The *SplitEnumerator* lists all files under the given directory path. It assigns Splits to the next reader that requests a Split. Once all Splits are assigned, it responds to requests with *NoMoreSplits*.
+  - The *SourceReader* requests a Split and reads the assigned Split (file or file region) and parses it using the given Format. If it does not get another Split, but a *NoMoreSplits* message, it finishes.
+
+**Unbounded Streaming File Source**
+
+This source works the same way as described above, except that the *SplitEnumerator* never responds with *NoMoreSplits* and periodically lists the contents under the given URI/Path to check for new files. Once it finds new files, it generates new Splits for them and can assign them to the available SourceReaders.
+
+**Unbounded Streaming Kafka Source**
+
+The source has a Kafka Topic (or list of Topics or Topic regex) and a *Deserializer* to parse the records.
+
+  - A *Split* is a Kafka Topic Partition.
+  - The *SplitEnumerator* connects to the brokers to list all topic partitions involved in the subscribed topics. The enumerator can optionally repeat this operation to discover newly added topics/partitions.
+  - The *SourceReader* reads the assigned Splits (Topic Partitions) using the KafkaConsumer and deserializes the records using the provided Deserializer. The splits (Topic Partitions) do not have an end, so the reader never reaches the end of the data.
+
+**Bounded Kafka Source**
+
+Same as above, except that each Split (Topic Partition) has a defined end offset. Once the *SourceReader* reaches the end offset for a Split, it finishes that Split. Once all assigned Splits are finished, the SourceReader finishes.
+
+----
+----
+
+## The Data Source API
+
+Source, SourceEnumerator, SourceReader.
+
+env.continuousSource(...).
+
+----
+----
+
+## The Split Reader API
+
+
+core SourceReader API is asynchronous and rather low level, leavers users to handle splits manually.
+in practice, many sources have perform blocking poll() and I/O operations, needing separate threads.
+Split Reader is base implementation for that.
+
+----
+----
+
+## Event Time and Watermarks
+
+*Event Time* assignment and *Watermark Generation* happen as part of the data sources. The event streams leaving the Source Readers have event timestamps and (during streaming execution) contain watermarks. See [Timely Stream Processing]({{ site.baseurl }}/concepts/timely-stream-processing.html) for an introduction to Event Time and Watermarks.
+
+<span class="label label-danger">Important</span> Applications based on the legacy [SourceFunction](https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java) typically generate timestamps and watermarks in a separate later step via `stream.assignTimestampsAndWatermarks(WatermarkStrategy)`. This function should not be used with the new sources, because timestamps will be already assigned, and it will [...]
+
+#### API
+
+The `WatermarkStrategy` is passed to the Source during creation in the DataStream API and creates both the [TimestampAssigner](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/TimestampAssigner.java) and [WatermarkGenerator](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkGenerator.java).
+
+{% highlight java %}
+environment.continuousSource(
+    Source<OUT, ?, ?> source,
+    WatermarkStrategy<OUT> timestampsAndWatermarks,
+    String sourceName)
+{% endhighlight %}
+
+The `TimestampAssigner` and `WatermarkGenerator` run transparently as part of the `ReaderOutput`(or `SourceOutput`) so source implementors do not have to implement and timestamp extraction and watermark generation code.
+
+#### Event Timestamps
+
+Event timestamps are assigned in two steps:
+
+  1. The SourceReader may attach the *source record timestamp* to the event, by calling `SourceOutput.collect(event, timestamp)`.
+     This is relevant only for data sources that are record-based and have timestamps, such as Kafka, Kinesis, Pulsar, or Pravega.
+     Sources that are not based on records with timestamps (like files) do not have a *source record timestamp*.
+     This step is part of the source connector implementation and not parameterized by the application that uses the source.
+
+  2. The `TimestampAssigner`, which is configured by the application, assigns the final timestamp.
+     The `TimestampAssigner` sees the original *source record timestamp* and the event. The assigner can use the *source record timestamp* or access a field of the event obtain the final event timestamp.
+  
+This two-step approach allows users to reference both timestamps from the source systems and timestamps in the event's data as the event timestamp.
+
+*Note:* When using a data source without *source record timestamps* (like files) and selecting the *source record timestamp* as the final event timestamp, events will get a default timestamp equal to `LONG_MIN` *(=-9,223,372,036,854,775,808)*.
+
+#### Watermark Generation
+
+Watermark Generators are only active during streaming execution. Batch execution deactivates Watermark Generators; all related operations described below become effectively no-ops.
+
+The data source API supports running watermark generators individually *per split*. That allows Flink to observe the event time progress per split individually, which is important to handle *event time skew* properly and prevent *idle partitions* from holding back the event time progress of the entire application.
+
+<div style="text-align: center">
+  <img width="80%" src="{{ site.baseurl }}/fig/per_split_watermarks.svg" alt="Watermark Generation in a Source with two Splits." />
+</div>
+
+When implementing a source connector using the *Split Reader API*, this is automatically handled. All implementations based on the Split Reader API have split-aware watermarks out-of-the-box.
+
+For an implementation of the lower level `SourceReader` API to use split-aware watermark generation, the implementation must ouput events from different splits to different outputs, the *split-local SourceOutputs*. Split-local outputs can be created and released on the main [ReaderOutput](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderOutput.java) via the `createOutputForSplit(splitId)` and `releaseOutputForSplit(splitId)` [...]
diff --git a/docs/fig/per_split_watermarks.svg b/docs/fig/per_split_watermarks.svg
new file mode 100644
index 0000000..cc5371c
--- /dev/null
+++ b/docs/fig/per_split_watermarks.svg
@@ -0,0 +1,20 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<svg width="916" height="303" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" overflow="hidden"><defs><clipPath id="clip0"><path d="M192-10 1108-10 1108 293 192 293Z" fill-rule="evenodd" clip-rule="evenodd"/></clipPath></defs><g clip-path="url(#clip0)" transform="translate(-192 10)"><path d="M490 109C490 43.8304 542.383-9 607-9 671.617-9 724 43.8304 724 109 724 174.17 671.617 227 607 227 542.383 227 490 174.17 490 109Z" fill="#AECBCC" fill-rule="evenodd"/><t [...]
\ No newline at end of file
diff --git a/docs/fig/source_components.svg b/docs/fig/source_components.svg
new file mode 100644
index 0000000..417b7ce
--- /dev/null
+++ b/docs/fig/source_components.svg
@@ -0,0 +1,20 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<svg width="855" height="487" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" overflow="hidden"><defs><clipPath id="clip0"><path d="M26 15 881 15 881 502 26 502Z" fill-rule="evenodd" clip-rule="evenodd"/></clipPath></defs><g clip-path="url(#clip0)" transform="translate(-26 -15)"><path d="M26 158.936C26 151.239 32.2394 145 39.936 145L326.064 145C333.761 145 340 151.239 340 158.936L340 298.064C340 305.761 333.761 312 326.064 312L39.936 312C32.2394 312 26 305.7 [...]
\ No newline at end of file


[flink] 02/04: [FLINK-10740][docs] Add documentation for FLIP-27 Source API and SplitReader API.

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 846ae78e8d61a25a6906eaae53caa6d75e8da5d6
Author: Jiangjie (Becket) Qin <ji...@alibaba-inc.com>
AuthorDate: Tue Jun 9 23:35:22 2020 +0800

    [FLINK-10740][docs] Add documentation for FLIP-27 Source API and SplitReader API.
---
 docs/dev/stream/sources.md    | 238 +++++++++++++++++++++++++++++++++++++++---
 docs/dev/stream/sources.zh.md | 238 +++++++++++++++++++++++++++++++++++++++---
 docs/fig/source_reader.svg    |  20 ++++
 3 files changed, 472 insertions(+), 24 deletions(-)

diff --git a/docs/dev/stream/sources.md b/docs/dev/stream/sources.md
index 0257661..040318e 100644
--- a/docs/dev/stream/sources.md
+++ b/docs/dev/stream/sources.md
@@ -96,27 +96,241 @@ The source has a Kafka Topic (or list of Topics or Topic regex) and a *Deseriali
 
 Same as above, except that each Split (Topic Partition) has a defined end offset. Once the *SourceReader* reaches the end offset for a Split, it finishes that Split. Once all assigned Splits are finished, the SourceReader finishes.
 
-----
-----
-
 ## The Data Source API
+This section describes the major interfaces of the new Source API introduced in FLIP-27, and provides tips to the developers on the Source development. 
+
+### Source
+The [Source](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java) API is a factory style interface to create the following components.
+
+  - *Split Enumerator*
+  - *Source Reader*
+  - *Split Serializer*
+  - *Enumerator Checkpoint Serializer*
+  
+In addition to that, the Source provides the [boundedness](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/source/Boundedness.java) attribute of the source, so that Flink can choose appropriate mode to run the Flink jobs.
+
+The Source implementations should be serializable as the Source instances are serialized and uploaded to the Flink cluster at runtime.
+
+### SplitEnumerator
+The [SplitEnumerator](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java) is expected to be the "brain" of the Source. Typical implementations of the `SplitEnumerator` do the following:
+
+  - `SourceReader` registration handling
+  - `SourceReader` failure handling
+    - The `addSplitsBack()` method will be invoked when a `SourceReader` fails. The SplitEnumerator should take back the split assignments that have not been acknowledged by the failed `SourceReader`.
+  - `SourceEvent` handling
+    - `SourceEvent`s are custom events sent between `SplitEnumerator` and `SourceReader`. The implementation can leverage this mechanism to perform sophisticated coordination.  
+  - Split discovery and assignment
+    - The `SplitEnumerator` can assign splits to the `SourceReader`s in response to various events, including discovery of new splits, new `SourceReader` registration, `SourceReader` failure, etc.
+
+A `SplitEnumerator` can accomplish the above work with the help of the [SplitEnumeratorContext](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java) which is provided to the `Source` on creation or restore of the `SplitEnumerator`. 
+The `SplitEnumeratorContext` allows a `SplitEnumerator` to retrieve necessary information of the readers and perform coordination actions.
+The `Source` implementation is expected to pass the `SplitEnumeratorContext` to the `SplitEnumerator` instance. 
+
+While a `SplitEnumerator` implementation can work well in a reactive way by only taking coordination actions when its method is invoked, some `SplitEnumerator` implementations might want to take actions actively. For example, a `SplitEnumerator` may want to periodically run split discovery and assign the new splits to the `SourceReaders`. 
+Such implementations may find that the `callAsync()` method `SplitEnumeratorContext` is handy. The code snippet below shows how the `SplitEnumerator` implementation can achieve that without maintaining its own threads.
+<div data-lang="java" markdown="1">
+{% highlight java %}
+class MySplitEnumerator implements SplitEnumerator<MySplit> {
+    private final long DISCOVER_INTERVAL = 60_000L;
+
+    /**
+     * A method to discover the splits.
+     */
+    private List<MySplit> discoverSplits() {...}
+    
+    @Override
+    public void start() {
+        ...
+        enumContext.callAsync(this::discoverSplits, splits -> {
+            Map<Integer, List<MockSourceSplit>> assignments = new HashMap<>();
+            int parallelism = enumContext.currentParallelism();
+            for (MockSourceSplit split : splits) {
+                int owner = split.splitId().hashCode() % parallelism;
+                assignments.computeIfAbsent(owner, new ArrayList<>()).add(split);
+            }
+            enumContext.assignSplits(new SplitsAssignment<>(assignments));
+        }, 0L, DISCOVER_INTERVAL);
+        ...
+    }
+    ...
+}
+{% endhighlight %}
+</div>
+
+### SourceReader
+
+The [SourceReader](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java) is a component running in the Task Managers to consume the records from the Splits. 
+
+The `SourceReader` exposes a pull-based consumption interface. A Flink task keeps calling `pollNext(ReaderOutput)` in a loop to poll records from the `SourceReader`. The return value of the `pollNext(ReaderOutput)` method indicates the status of the source reader.
 
-Source, SourceEnumerator, SourceReader.
+  - `MORE_AVAILABLE` - The SourceReader has more records available intermediately.
+  - `NOTHING_AVAILABLE` - The SourceReader does not have more records available at this point, but may have more records in the future.
+  - `END_OF_INPUT` - The SourceReader has exhausted all the records and reached the end of data. This means the SourceReader can be closed.
 
-env.continuousSource(...).
+In the interest of performance, a `ReaderOutput` is provided to the `pollNext(ReaderOutput)` method, so a `SourceReader` can emit multiple records in a single call of pollNext() if it has to. For example, sometimes the external system works at the granularity of blocks. A block may contain multiple records but the source can only checkpoint at the block boundaries. In this case the `SourceReader` can emit one block at a time to the `ReaderOutput`.
+**However, the `SourceReader` implementation should avoid emitting multiple records in a single `pollNext(ReaderOutput)` invocation unless necessary.** 
 
-----
-----
+All the state of a `SourceReader` should be maintained inside the `SourceSplit`s which are returned at the `snapshotState()` invocation. Doing this allows the `SourceSplit`s to be reassigned to other `SourceReaders` when needed.
+
+A `SourceReaderContext` is provided to the `Source` upon a `SourceReader` creation. It is expected that the `Source` will pass the context to the `SourceReader` instance. The `SourceReader` can send `SourceEvent` to its `SplitEnumerator`. A typical design pattern for the `Source` is letting the `SourceReader`s report their local information to the `SplitEnumerator` who has a global view to make decisions.
+
+The `SourceReader` API is a low level API that allows users to deal with the splits manually and have their own threading model to fetch and handover the records. To facilitate the `SourceReader` implementation, Flink has provided a [SourceReaderBase](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java) class which significantly reduce the amount the work needed to write a `Sou [...]
+**It is highly recommended for the connector developers to take advantage of the `SourceReaderBase` instead of writing the `SourceReader`s by themselves from scratch**. For more details please check the [Split Reader API](#the-split-reader-api) section.
+
+### Use the Source
+In order to create a `DataStream` from a `Source`, one needs to pass the `Source` to a `StreamExecutionEnvironment`. For example,
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+Source mySource = new MySource(...);
+
+DataStream<Integer> stream = env.continuousSource(
+        mySource,
+        WatermarkStrategy.noWatermarks(),
+        "MySourceName");
+...
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+
+val mySource = new MySource(...)
+
+val stream = env.continuousSource(
+      mySource,
+      WatermarkStrategy.noWatermarks(),
+      "MySourceName")
+...
+{% endhighlight %}
+</div>
+</div>
 
 ## The Split Reader API
+Core SourceReader API is asynchronous and rather low level, leaving users to handle splits manually.
+In practice, many sources have perform blocking poll() and I/O operations, needing separate threads.
+[SplitReader](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java) is the API for such high level API.
+The `SplitReader` API is designed work together with `SourceReaderBase` class which is a base implementation for the `SourceReader`s. The `SourceReaderBase` takes a split reader supplier and creates fetcher threads with various consumption threading models.
+
+### SplitReader
+The `SplitReader` API only has three methods:
+  - A blocking fetch method to return a [RecordsWithSplitIds](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsWithSplitIds.java)
+  - A non-blocking method to handle split changes.
+  - A non-blocking wake up method to wake up the blocking fetch.
+
+The `SplitReader` only focuses on reading the records from the external system, therefore is much simpler compared with `SourceReader`.
+Please check the Java doc of the class for more details.
+
+### SourceReaderBase
+It is quite common that a `SourceReader` implementation does the following:
+
+  - Have a pool of threads fetching from splits of the external system in a blocking way.
+  - Handle the synchronization between the fetching threads mentioned above and the `pollNext(ReaderOutput)` invocation.
+  - Maintain the per split watermark for watermark alignment.
+  - Maintain the state of each split for checkpoint
+  
+In order to reduce the work of writing a new `SourceReader`, Flink provides a [SourceReaderBase](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java) class to serve as a base implementation of the `SourceReader`. 
+`SourceReaderBase` has all the above work done out of the box. To write a new `SourceReader`, one can just let the `SourceReader` implementation inherit from the `SourceReaderBase`, fill in a few methods and implement a high level [SplitReader](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java).
 
+### SplitFetcherManager
+The `SourceReaderBase` supports a few threading models out of the box, depending on the behavior of the [SplitFetcherManager](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java) it works with.
+The `SplitFetcherManager` helps create and maintain a pool of `SplitFetcher`s each fetching with a `SplitReader`. It also determines how to assign splits to each split fetcher.
 
-core SourceReader API is asynchronous and rather low level, leavers users to handle splits manually.
-in practice, many sources have perform blocking poll() and I/O operations, needing separate threads.
-Split Reader is base implementation for that.
+As an example, as illustrated below, a `SplitFetcherManager` may have a fixed number of threads, each fetching from some splits assigned to the `SourceReader`.
+<div style="text-align: center">
+  <img width="70%" src="{{ site.baseurl }}/fig/source_reader.svg" alt="One fetcher per split threading model." />
+</div>
+The following code snippet implements the this threading model.
+<div data-lang="java" markdown="1">
+{% highlight java %}
+/**
+ * A SplitFetcherManager that has a fixed size of split fetchers and assign splits 
+ * to the split fetchers based on the hash code of split IDs.
+ */
+public class FixedSizeSplitFetcherManager<E, SplitT extends SourceSplit> 
+        extends SplitFetcherManager<E, SplitT> {
+    private final int numFetchers;
+
+    public FixedSizeSplitFetcherManager(
+            int numFetchers,
+            FutureNotifier futureNotifier,
+            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
+            Supplier<SplitReader<E, SplitT>> splitReaderSupplier) {
+        super(futureNotifier, elementsQueue, splitReaderSupplier);
+        this.numFetchers = numFetchers;
+        // Create numFetchers split fetchers.
+        for (int i = 0; i < numFetchers; i++) {
+            startFetcher(createSplitFetcher());
+        }
+    }
+
+    @Override
+    public void addSplits(List<SplitT> splitsToAdd) {
+        // Group splits by their owner fetchers.
+        Map<Integer, List<SplitT>> splitsByFetcherIndex = new HashMap<>();
+        splitsToAdd.forEach(split -> {
+            int ownerFetcherIndex = split.hashCode() % numFetchers;
+            splitsByFetcherIndex
+                    .computeIfAbsent(ownerFetcherIndex, s -> new ArrayList<>())
+                    .add(split);
+        });
+        // Assign the splits to their owner fetcher.
+        splitsByFetcherIndex.forEach((fetcherIndex, splitsForFetcher) -> {
+            fetchers.get(fetcherIndex).addSplits(splitsForFetcher);
+        });
+    }
+}
+{% endhighlight %}
+</div>
+
+And a `SourceReader` using this threading model can be created like following:
+<div data-lang="java" markdown="1">
+{% highlight java %}
+public class FixedFetcherSizeSourceReader<E, T, SplitT extends SourceSplit, SplitStateT>
+        extends SourceReaderBase<E, T, SplitT, SplitStateT> {
+
+    public FixedFetcherSizeSourceReader(
+            FutureNotifier futureNotifier,
+            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
+            Supplier<SplitReader<E, SplitT>> splitFetcherSupplier,
+            RecordEmitter<E, T, SplitStateT> recordEmitter,
+            Configuration config,
+            SourceReaderContext context) {
+        super(
+                futureNotifier,
+                elementsQueue,
+                new FixedSizeSplitFetcherManager<>(
+                        config.getInteger(SourceConfig.NUM_FETCHERS),
+                        futureNotifier,
+                        elementsQueue,
+                        splitFetcherSupplier),
+                recordEmitter,
+                config,
+                context);
+    }
+
+    @Override
+    protected void onSplitFinished(Collection<String> finishedSplitIds) {
+        // Do something in the callback for the finished splits.
+    }
+
+    @Override
+    protected SplitStateT initializedState(SplitT split) {
+        ...
+    }
+
+    @Override
+    protected SplitT toSplitType(String splitId, SplitStateT splitState) {
+        ...
+    }
+}
+{% endhighlight %}
+</div>
 
-----
-----
+Apparently, the `SourceReader` implementations can also implement their own threading model easily on top of the `SplitFetcherManager` and `SourceReaderBase`.
 
 ## Event Time and Watermarks
 
diff --git a/docs/dev/stream/sources.zh.md b/docs/dev/stream/sources.zh.md
index 0257661..040318e 100644
--- a/docs/dev/stream/sources.zh.md
+++ b/docs/dev/stream/sources.zh.md
@@ -96,27 +96,241 @@ The source has a Kafka Topic (or list of Topics or Topic regex) and a *Deseriali
 
 Same as above, except that each Split (Topic Partition) has a defined end offset. Once the *SourceReader* reaches the end offset for a Split, it finishes that Split. Once all assigned Splits are finished, the SourceReader finishes.
 
-----
-----
-
 ## The Data Source API
+This section describes the major interfaces of the new Source API introduced in FLIP-27, and provides tips to the developers on the Source development. 
+
+### Source
+The [Source](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java) API is a factory style interface to create the following components.
+
+  - *Split Enumerator*
+  - *Source Reader*
+  - *Split Serializer*
+  - *Enumerator Checkpoint Serializer*
+  
+In addition to that, the Source provides the [boundedness](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/source/Boundedness.java) attribute of the source, so that Flink can choose appropriate mode to run the Flink jobs.
+
+The Source implementations should be serializable as the Source instances are serialized and uploaded to the Flink cluster at runtime.
+
+### SplitEnumerator
+The [SplitEnumerator](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java) is expected to be the "brain" of the Source. Typical implementations of the `SplitEnumerator` do the following:
+
+  - `SourceReader` registration handling
+  - `SourceReader` failure handling
+    - The `addSplitsBack()` method will be invoked when a `SourceReader` fails. The SplitEnumerator should take back the split assignments that have not been acknowledged by the failed `SourceReader`.
+  - `SourceEvent` handling
+    - `SourceEvent`s are custom events sent between `SplitEnumerator` and `SourceReader`. The implementation can leverage this mechanism to perform sophisticated coordination.  
+  - Split discovery and assignment
+    - The `SplitEnumerator` can assign splits to the `SourceReader`s in response to various events, including discovery of new splits, new `SourceReader` registration, `SourceReader` failure, etc.
+
+A `SplitEnumerator` can accomplish the above work with the help of the [SplitEnumeratorContext](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java) which is provided to the `Source` on creation or restore of the `SplitEnumerator`. 
+The `SplitEnumeratorContext` allows a `SplitEnumerator` to retrieve necessary information of the readers and perform coordination actions.
+The `Source` implementation is expected to pass the `SplitEnumeratorContext` to the `SplitEnumerator` instance. 
+
+While a `SplitEnumerator` implementation can work well in a reactive way by only taking coordination actions when its method is invoked, some `SplitEnumerator` implementations might want to take actions actively. For example, a `SplitEnumerator` may want to periodically run split discovery and assign the new splits to the `SourceReaders`. 
+Such implementations may find that the `callAsync()` method `SplitEnumeratorContext` is handy. The code snippet below shows how the `SplitEnumerator` implementation can achieve that without maintaining its own threads.
+<div data-lang="java" markdown="1">
+{% highlight java %}
+class MySplitEnumerator implements SplitEnumerator<MySplit> {
+    private final long DISCOVER_INTERVAL = 60_000L;
+
+    /**
+     * A method to discover the splits.
+     */
+    private List<MySplit> discoverSplits() {...}
+    
+    @Override
+    public void start() {
+        ...
+        enumContext.callAsync(this::discoverSplits, splits -> {
+            Map<Integer, List<MockSourceSplit>> assignments = new HashMap<>();
+            int parallelism = enumContext.currentParallelism();
+            for (MockSourceSplit split : splits) {
+                int owner = split.splitId().hashCode() % parallelism;
+                assignments.computeIfAbsent(owner, new ArrayList<>()).add(split);
+            }
+            enumContext.assignSplits(new SplitsAssignment<>(assignments));
+        }, 0L, DISCOVER_INTERVAL);
+        ...
+    }
+    ...
+}
+{% endhighlight %}
+</div>
+
+### SourceReader
+
+The [SourceReader](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java) is a component running in the Task Managers to consume the records from the Splits. 
+
+The `SourceReader` exposes a pull-based consumption interface. A Flink task keeps calling `pollNext(ReaderOutput)` in a loop to poll records from the `SourceReader`. The return value of the `pollNext(ReaderOutput)` method indicates the status of the source reader.
 
-Source, SourceEnumerator, SourceReader.
+  - `MORE_AVAILABLE` - The SourceReader has more records available intermediately.
+  - `NOTHING_AVAILABLE` - The SourceReader does not have more records available at this point, but may have more records in the future.
+  - `END_OF_INPUT` - The SourceReader has exhausted all the records and reached the end of data. This means the SourceReader can be closed.
 
-env.continuousSource(...).
+In the interest of performance, a `ReaderOutput` is provided to the `pollNext(ReaderOutput)` method, so a `SourceReader` can emit multiple records in a single call of pollNext() if it has to. For example, sometimes the external system works at the granularity of blocks. A block may contain multiple records but the source can only checkpoint at the block boundaries. In this case the `SourceReader` can emit one block at a time to the `ReaderOutput`.
+**However, the `SourceReader` implementation should avoid emitting multiple records in a single `pollNext(ReaderOutput)` invocation unless necessary.** 
 
-----
-----
+All the state of a `SourceReader` should be maintained inside the `SourceSplit`s which are returned at the `snapshotState()` invocation. Doing this allows the `SourceSplit`s to be reassigned to other `SourceReaders` when needed.
+
+A `SourceReaderContext` is provided to the `Source` upon a `SourceReader` creation. It is expected that the `Source` will pass the context to the `SourceReader` instance. The `SourceReader` can send `SourceEvent` to its `SplitEnumerator`. A typical design pattern for the `Source` is letting the `SourceReader`s report their local information to the `SplitEnumerator` who has a global view to make decisions.
+
+The `SourceReader` API is a low level API that allows users to deal with the splits manually and have their own threading model to fetch and handover the records. To facilitate the `SourceReader` implementation, Flink has provided a [SourceReaderBase](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java) class which significantly reduce the amount the work needed to write a `Sou [...]
+**It is highly recommended for the connector developers to take advantage of the `SourceReaderBase` instead of writing the `SourceReader`s by themselves from scratch**. For more details please check the [Split Reader API](#the-split-reader-api) section.
+
+### Use the Source
+In order to create a `DataStream` from a `Source`, one needs to pass the `Source` to a `StreamExecutionEnvironment`. For example,
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+Source mySource = new MySource(...);
+
+DataStream<Integer> stream = env.continuousSource(
+        mySource,
+        WatermarkStrategy.noWatermarks(),
+        "MySourceName");
+...
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+
+val mySource = new MySource(...)
+
+val stream = env.continuousSource(
+      mySource,
+      WatermarkStrategy.noWatermarks(),
+      "MySourceName")
+...
+{% endhighlight %}
+</div>
+</div>
 
 ## The Split Reader API
+Core SourceReader API is asynchronous and rather low level, leaving users to handle splits manually.
+In practice, many sources have perform blocking poll() and I/O operations, needing separate threads.
+[SplitReader](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java) is the API for such high level API.
+The `SplitReader` API is designed work together with `SourceReaderBase` class which is a base implementation for the `SourceReader`s. The `SourceReaderBase` takes a split reader supplier and creates fetcher threads with various consumption threading models.
+
+### SplitReader
+The `SplitReader` API only has three methods:
+  - A blocking fetch method to return a [RecordsWithSplitIds](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsWithSplitIds.java)
+  - A non-blocking method to handle split changes.
+  - A non-blocking wake up method to wake up the blocking fetch.
+
+The `SplitReader` only focuses on reading the records from the external system, therefore is much simpler compared with `SourceReader`.
+Please check the Java doc of the class for more details.
+
+### SourceReaderBase
+It is quite common that a `SourceReader` implementation does the following:
+
+  - Have a pool of threads fetching from splits of the external system in a blocking way.
+  - Handle the synchronization between the fetching threads mentioned above and the `pollNext(ReaderOutput)` invocation.
+  - Maintain the per split watermark for watermark alignment.
+  - Maintain the state of each split for checkpoint
+  
+In order to reduce the work of writing a new `SourceReader`, Flink provides a [SourceReaderBase](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java) class to serve as a base implementation of the `SourceReader`. 
+`SourceReaderBase` has all the above work done out of the box. To write a new `SourceReader`, one can just let the `SourceReader` implementation inherit from the `SourceReaderBase`, fill in a few methods and implement a high level [SplitReader](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java).
 
+### SplitFetcherManager
+The `SourceReaderBase` supports a few threading models out of the box, depending on the behavior of the [SplitFetcherManager](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java) it works with.
+The `SplitFetcherManager` helps create and maintain a pool of `SplitFetcher`s each fetching with a `SplitReader`. It also determines how to assign splits to each split fetcher.
 
-core SourceReader API is asynchronous and rather low level, leavers users to handle splits manually.
-in practice, many sources have perform blocking poll() and I/O operations, needing separate threads.
-Split Reader is base implementation for that.
+As an example, as illustrated below, a `SplitFetcherManager` may have a fixed number of threads, each fetching from some splits assigned to the `SourceReader`.
+<div style="text-align: center">
+  <img width="70%" src="{{ site.baseurl }}/fig/source_reader.svg" alt="One fetcher per split threading model." />
+</div>
+The following code snippet implements the this threading model.
+<div data-lang="java" markdown="1">
+{% highlight java %}
+/**
+ * A SplitFetcherManager that has a fixed size of split fetchers and assign splits 
+ * to the split fetchers based on the hash code of split IDs.
+ */
+public class FixedSizeSplitFetcherManager<E, SplitT extends SourceSplit> 
+        extends SplitFetcherManager<E, SplitT> {
+    private final int numFetchers;
+
+    public FixedSizeSplitFetcherManager(
+            int numFetchers,
+            FutureNotifier futureNotifier,
+            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
+            Supplier<SplitReader<E, SplitT>> splitReaderSupplier) {
+        super(futureNotifier, elementsQueue, splitReaderSupplier);
+        this.numFetchers = numFetchers;
+        // Create numFetchers split fetchers.
+        for (int i = 0; i < numFetchers; i++) {
+            startFetcher(createSplitFetcher());
+        }
+    }
+
+    @Override
+    public void addSplits(List<SplitT> splitsToAdd) {
+        // Group splits by their owner fetchers.
+        Map<Integer, List<SplitT>> splitsByFetcherIndex = new HashMap<>();
+        splitsToAdd.forEach(split -> {
+            int ownerFetcherIndex = split.hashCode() % numFetchers;
+            splitsByFetcherIndex
+                    .computeIfAbsent(ownerFetcherIndex, s -> new ArrayList<>())
+                    .add(split);
+        });
+        // Assign the splits to their owner fetcher.
+        splitsByFetcherIndex.forEach((fetcherIndex, splitsForFetcher) -> {
+            fetchers.get(fetcherIndex).addSplits(splitsForFetcher);
+        });
+    }
+}
+{% endhighlight %}
+</div>
+
+And a `SourceReader` using this threading model can be created like following:
+<div data-lang="java" markdown="1">
+{% highlight java %}
+public class FixedFetcherSizeSourceReader<E, T, SplitT extends SourceSplit, SplitStateT>
+        extends SourceReaderBase<E, T, SplitT, SplitStateT> {
+
+    public FixedFetcherSizeSourceReader(
+            FutureNotifier futureNotifier,
+            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
+            Supplier<SplitReader<E, SplitT>> splitFetcherSupplier,
+            RecordEmitter<E, T, SplitStateT> recordEmitter,
+            Configuration config,
+            SourceReaderContext context) {
+        super(
+                futureNotifier,
+                elementsQueue,
+                new FixedSizeSplitFetcherManager<>(
+                        config.getInteger(SourceConfig.NUM_FETCHERS),
+                        futureNotifier,
+                        elementsQueue,
+                        splitFetcherSupplier),
+                recordEmitter,
+                config,
+                context);
+    }
+
+    @Override
+    protected void onSplitFinished(Collection<String> finishedSplitIds) {
+        // Do something in the callback for the finished splits.
+    }
+
+    @Override
+    protected SplitStateT initializedState(SplitT split) {
+        ...
+    }
+
+    @Override
+    protected SplitT toSplitType(String splitId, SplitStateT splitState) {
+        ...
+    }
+}
+{% endhighlight %}
+</div>
 
-----
-----
+Apparently, the `SourceReader` implementations can also implement their own threading model easily on top of the `SplitFetcherManager` and `SourceReaderBase`.
 
 ## Event Time and Watermarks
 
diff --git a/docs/fig/source_reader.svg b/docs/fig/source_reader.svg
new file mode 100644
index 0000000..1d4d9ac
--- /dev/null
+++ b/docs/fig/source_reader.svg
@@ -0,0 +1,20 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<svg width="1257" height="653" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" overflow="hidden"><defs><clipPath id="clip0"><path d="M-11 41 1246 41 1246 694-11 694Z" fill-rule="evenodd" clip-rule="evenodd"/></clipPath><clipPath id="clip1"><path d="M1017 309 1130 309 1130 420 1017 420Z" fill-rule="evenodd" clip-rule="evenodd"/></clipPath><clipPath id="clip2"><path d="M1017 309 1130 309 1130 420 1017 420Z" fill-rule="evenodd" clip-rule="evenodd"/></clipPath>< [...]
\ No newline at end of file