You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2022/08/30 11:22:09 UTC

[flink] branch master updated: [FLINK-28121][docs-zh]Translate "Extension Points" and "Full Stack Example" in "User-defined Sources & Sinks" page

This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b3dcafa9db2 [FLINK-28121][docs-zh]Translate "Extension Points" and "Full Stack Example" in "User-defined Sources & Sinks" page
b3dcafa9db2 is described below

commit b3dcafa9db278fc02945c7bc5c32765c99d00bb1
Author: Chengkai Yang <ya...@163.com>
AuthorDate: Sun Jul 3 22:44:47 2022 +0800

    [FLINK-28121][docs-zh]Translate "Extension Points" and "Full Stack Example" in "User-defined Sources & Sinks" page
---
 docs/content.zh/docs/dev/table/sourcesSinks.md | 329 +++++++++++--------------
 1 file changed, 144 insertions(+), 185 deletions(-)

diff --git a/docs/content.zh/docs/dev/table/sourcesSinks.md b/docs/content.zh/docs/dev/table/sourcesSinks.md
index 36fcf912e7f..e897f15a1f2 100644
--- a/docs/content.zh/docs/dev/table/sourcesSinks.md
+++ b/docs/content.zh/docs/dev/table/sourcesSinks.md
@@ -133,242 +133,209 @@ If you need a feature available only internally within the `org.apache.flink.tab
 To learn more, check out [Anatomy of Table Dependencies]({{< ref "docs/dev/configuration/advanced" >}}#anatomy-of-table-dependencies).
 {{< /hint >}}
 
-Extension Points
+<a name="extension-points"></a>
+
+扩展点
 ----------------
 
-This section explains the available interfaces for extending Flink's table connectors.
+这一部分主要介绍扩展 Flink table connector 时可能用到的接口。
+
+<a name="dynamic-table-factories"></a>
 
-### Dynamic Table Factories
+### 动态表的工厂类
 
-Dynamic table factories are used to configure a dynamic table connector for an external storage system from catalog
-and session information.
+在根据 catalog 与 Flink 运行时上下文信息,为某个外部存储系统配置动态表连接器时,需要用到动态表的工厂类。
 
-`org.apache.flink.table.factories.DynamicTableSourceFactory` can be implemented to construct a `DynamicTableSource`.
+比如,通过实现 `org.apache.flink.table.factories.DynamicTableSourceFactory` 接口完成一个工厂类,来生产 `DynamicTableSource` 类。
 
-`org.apache.flink.table.factories.DynamicTableSinkFactory` can be implemented to construct a `DynamicTableSink`.
+通过实现 `org.apache.flink.table.factories.DynamicTableSinkFactory` 接口完成一个工厂类,来生产 `DynamicTableSink` 类。
 
-By default, the factory is discovered using the value of the `connector` option as the factory identifier
-and Java's Service Provider Interface.
+默认情况下,Java 的 SPI 机制会自动识别这些工厂类,同时将 `connector` 配置项作为工厂类的”标识符“。
 
-In JAR files, references to new implementations can be added to the service file:
+在 JAR 文件中,需要将实现的工厂类路径放入到下面这个配置文件:
 
 `META-INF/services/org.apache.flink.table.factories.Factory`
 
-The framework will check for a single matching factory that is uniquely identified by factory identifier
-and requested base class (e.g. `DynamicTableSourceFactory`).
+Flink 会对工厂类逐个进行检查,确保其“标识符”是全局唯一的,并且按照要求实现了上面提到的接口 (比如 `DynamicTableSourceFactory`)。
+
+如果必要的话,也可以在实现 catalog 时绕过上述 SPI 机制识别工厂类的过程。即在实现 catalog 接口时,在`org.apache.flink.table.catalog.Catalog#getFactory` 方法中直接返回工厂类的实例。
+
+<a name="dynamic-table-source"></a>
 
-The factory discovery process can be bypassed by the catalog implementation if necessary. For this, a
-catalog needs to return an instance that implements the requested base class in `org.apache.flink.table.catalog.Catalog#getFactory`.
+### 动态表的 source 端
 
-### Dynamic Table Source
+按照定义,动态表是随时间变化的。
 
-By definition, a dynamic table can change over time.
+在读取动态表时,表中数据可以是以下情况之一:
+- changelog 流(支持有界或无界),在 changelog 流结束前,所有的改变都会被源源不断地消费,由 `ScanTableSource` 接口表示。
+- 处于一直变换或数据量很大的外部表,其中的数据一般不会被全量读取,除非是在查询某个值时,由 `LookupTableSource` 接口表示。
 
-When reading a dynamic table, the content can either be considered as:
-- A changelog (finite or infinite) for which all changes are consumed continuously until the changelog
-  is exhausted. This is represented by the `ScanTableSource` interface.
-- A continuously changing or very large external table whose content is usually never read entirely
-  but queried for individual values when necessary. This is represented by the `LookupTableSource`
-  interface.
+一个类可以同时实现这两个接口,Planner 会根据查询的 Query 选择相应接口中的方法。
 
-A class can implement both of these interfaces at the same time. The planner decides about their usage depending
-on the specified query.
+<a name= "scan-table-source"></a>
 
 #### Scan Table Source
 
-A `ScanTableSource` scans all rows from an external storage system during runtime.
+在运行期间,`ScanTableSource` 接口会按行扫描外部存储系统中所有数据。
 
-The scanned rows don't have to contain only insertions but can also contain updates and deletions. Thus,
-the table source can be used to read a (finite or infinite) changelog. The returned _changelog mode_ indicates
-the set of changes that the planner can expect during runtime.
+被扫描的数据可以是 insert、update、delete 三种操作类型,因此数据源可以用作读取 changelog (支持有界或无界)。在运行时,返回的 **_changelog mode_** 表示 Planner 要处理的操作类型。
 
-For regular batch scenarios, the source can emit a bounded stream of insert-only rows.
+在常规批处理的场景下,数据源可以处理 insert-only 操作类型的有界数据流。
 
-For regular streaming scenarios, the source can emit an unbounded stream of insert-only rows.
+在常规流处理的场景下,数据源可以处理 insert-only 操作类型的无界数据流。
 
-For change data capture (CDC) scenarios, the source can emit bounded or unbounded streams with insert,
-update, and delete rows.
+在变更日志数据捕获(即 CDC)场景下,数据源可以处理 insert、update、delete 操作类型的有界或无界数据流。
 
-A table source can implement further ability interfaces such as `SupportsProjectionPushDown` that might
-mutate an instance during planning. All abilities can be found in the `org.apache.flink.table.connector.source.abilities`
-package and are listed in the [source abilities table](#source-abilities).
+可以实现更多的功能接口来优化数据源,比如实现 `SupportsProjectionPushDown` 接口,这样在运行时在 source 端就处理数据。在 `org.apache.flink.table.connector.source.abilities` 包下可以找到各种功能接口,更多内容可查看 [source abilities table](#source-abilities)。
 
-The runtime implementation of a `ScanTableSource` must produce internal data structures. Thus, records
-must be emitted as `org.apache.flink.table.data.RowData`. The framework provides runtime converters such
-that a source can still work on common data structures and perform a conversion at the end.
+实现 `ScanTableSource` 接口的类必须能够生产 Flink 内部数据结构,因此每条记录都会按照`org.apache.flink.table.data.RowData` 的方式进行处理。Flink 运行时提供了转换机制保证 source 端可以处理常见的数据结构,并且在最后进行转换。
+
+<a name="lookup-table-source"></a>
 
 #### Lookup Table Source
 
-A `LookupTableSource` looks up rows of an external storage system by one or more keys during runtime.
+在运行期间,`LookupTableSource` 接口会在外部存储系统中按照 key 进行查找。
+
+相比于`ScanTableSource`,`LookupTableSource` 接口不会全量读取表中数据,只会在需要时向外部存储(其中的数据有可能会一直变化)发起查询请求,惰性地获取数据。
 
-Compared to `ScanTableSource`, the source does not have to read the entire table and can lazily fetch individual
-values from a (possibly continuously changing) external table when necessary.
+同时相较于`ScanTableSource`,`LookupTableSource` 接口目前只支持处理 insert-only 数据流。
 
-Compared to `ScanTableSource`, a `LookupTableSource` does only support emitting insert-only changes currently.
+暂时不支持扩展功能接口,可查看 `org.apache.flink.table.connector.source.LookupTableSource` 中的文档了解更多。
 
-Further abilities are not supported. See the documentation of `org.apache.flink.table.connector.source.LookupTableSource`
-for more information.
+`LookupTableSource` 的实现方法可以是 `TableFunction` 或者 `AsyncTableFunction`,Flink运行时会根据要查询的 key 值,调用这个实现方法进行查询。
 
-The runtime implementation of a `LookupTableSource` is a `TableFunction` or `AsyncTableFunction`. The function
-will be called with values for the given lookup keys during runtime.
+<a name="source-abilities"></a>
 
-#### Source Abilities
+#### source 端的功能接口
 
 <table class="table table-bordered">
     <thead>
         <tr>
-        <th class="text-left" style="width: 25%">Interface</th>
-        <th class="text-center" style="width: 75%">Description</th>
+        <th class="text-left" style="width: 25%">接口</th>
+        <th class="text-center" style="width: 75%">描述</th>
         </tr>
     </thead>
     <tbody>
     <tr>
         <td>{{< gh_link file="flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsFilterPushDown.java" name="SupportsFilterPushDown" >}}</td>
-        <td>Enables to push down the filter into the <code>DynamicTableSource</code>. For efficiency, a source can
-        push filters further down in order to be close to the actual data generation.</td>
+        <td>支持将过滤条件下推到 <code>DynamicTableSource</code>。为了更高效处理数据,source 端会将过滤条件下推,以便在数据产生时就处理。</td>
     </tr>
     <tr>
         <td>{{< gh_link file="flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsLimitPushDown.java" name="SupportsLimitPushDown" >}}</td>
-        <td>Enables to push down a limit (the expected maximum number of produced records) into a <code>DynamicTableSource</code>.</td>
+        <td>支持将 limit(期望生产的最大数据条数)下推到 <code>DynamicTableSource</code>。</td>
     </tr>
     <tr>
         <td>{{< gh_link file="flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsPartitionPushDown.java" name="SupportsPartitionPushDown" >}}</td>
-        <td>Enables to pass available partitions to the planner and push down partitions into a <code>DynamicTableSource</code>.
-        During the runtime, the source will only read data from the passed partition list for efficiency.</td>
+        <td>支持将可用的分区信息提供给 planner 并且将分区信息下推到 <code>DynamicTableSource</code>。在运行时为了更高效处理数据,source 端会只从提供的分区列表中读取数据。</td>
     </tr>
     <tr>
         <td>{{< gh_link file="flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsProjectionPushDown.java" name="SupportsProjectionPushDown" >}}</td>
-        <td>Enables to push down a (possibly nested) projection into a <code>DynamicTableSource</code>. For efficiency,
-        a source can push a projection further down in order to be close to the actual data generation. If the source
-        also implements <code>SupportsReadingMetadata</code>, the source will also read the required metadata only.
+        <td>支持将查询列(可嵌套)下推到 <code>DynamicTableSource</code>。为了更高效处理数据,source 端会将查询列下推,以便在数据产生时就处理。如果 source 端同时实现了 <code>SupportsReadingMetadata</code>,那么 source 端也会读取相对应列的元数据信息。
         </td>
     </tr>
     <tr>
         <td>{{< gh_link file="flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsReadingMetadata.java" name="SupportsReadingMetadata" >}}</td>
-        <td>Enables to read metadata columns from a <code>DynamicTableSource</code>. The source
-        is responsible to add the required metadata at the end of the produced rows. This includes
-        potentially forwarding metadata column from contained formats.</td>
+        <td>支持通过 <code>DynamicTableSource</code> 读取列的元数据信息。source 端会在生产数据行时,在最后添加相应的元数据信息,其中包括元数据的格式信息。</td>
     </tr>
     <tr>
         <td>{{< gh_link file="flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsWatermarkPushDown.java" name="SupportsWatermarkPushDown" >}}</td>
-        <td>Enables to push down a watermark strategy into a <code>DynamicTableSource</code>. The watermark
-        strategy is a builder/factory for timestamp extraction and watermark generation. During the runtime, the
-        watermark generator is located inside the source and is able to generate per-partition watermarks.</td>
+        <td>支持将水印策略下推到 <code>DynamicTableSource</code>。水印策略可以通过工厂模式或 Builder 模式来构建,用于抽取时间戳以及水印的生成。在运行时,source 端内部的水印生成器会为每个分区生产水印。</td>
     </tr>
     <tr>
         <td>{{< gh_link file="flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsSourceWatermark.java" name="SupportsSourceWatermark" >}}</td>
-        <td>Enables to fully rely on the watermark strategy provided by the <code>ScanTableSource</code>
-        itself. Thus, a <code>CREATE TABLE</code> DDL is able to use <code>SOURCE_WATERMARK()</code> which
-        is a built-in marker function that will be detected by the planner and translated into a call
-        to this interface if available.</td>
+        <td>支持使用 <code>ScanTableSource</code> 中提供的水印策略。当使用 <code>CREATE TABLE</code> DDL 时,<可以使用></可以使用> <code>SOURCE_WATERMARK()</code> 来告诉 planner 调用这个接口中的水印策略方法。</td>
     </tr>
     </tbody>
 </table>
 
-<span class="label label-danger">Attention</span> The interfaces above are currently only available for
-`ScanTableSource`, not for `LookupTableSource`.
+<span class="label label-danger">注意</span>上述接口当前只适用于 `ScanTableSource`,不适用于`LookupTableSource`。
+
+<a name="dynamic-table-sink"></a>
 
-### Dynamic Table Sink
+### 动态表的 sink 端
 
-By definition, a dynamic table can change over time.
+按照定义,动态表是随时间变化的。
 
-When writing a dynamic table, the content can always be considered as a changelog (finite or infinite)
-for which all changes are written out continuously until the changelog is exhausted. The returned _changelog mode_
-indicates the set of changes that the sink accepts during runtime.
+当写入一个动态表时,数据流可以被看作是 changelog (有界或无界都可),在 changelog 结束前,所有的变更都会被持续写入。在运行时,返回的 **_changelog mode_** 会显示 sink 端支持的数据操作类型。
 
-For regular batch scenarios, the sink can solely accept insert-only rows and write out bounded streams.
+在常规批处理的场景下,sink 端可以持续接收 insert-only 操作类型的数据,并写入到有界数据流中。
 
-For regular streaming scenarios, the sink can solely accept insert-only rows and can write out unbounded streams.
+在常规流处理的场景下,sink 端可以持续接收 insert-only 操作类型的数据,并写入到无界数据流中。
 
-For change data capture (CDC) scenarios, the sink can write out bounded or unbounded streams with insert,
-update, and delete rows.
+在变更日志数据捕获(即 CDC)场景下,sink 端可以将 insert、update、delete 操作类型的数据写入有界或无界数据流。
 
-A table sink can implement further ability interfaces such as `SupportsOverwrite` that might mutate an
-instance during planning. All abilities can be found in the `org.apache.flink.table.connector.sink.abilities`
-package and are listed in the [sink abilities table](#sink-abilities).
+可以实现 `SupportsOverwrite` 等功能接口,在 sink 端处理数据。可以在 `org.apache.flink.table.connector.sink.abilities` 包下找到各种功能接口,更多内容可查看[sink abilities table](#sink-abilities)。
 
-The runtime implementation of a `DynamicTableSink` must consume internal data structures. Thus, records
-must be accepted as `org.apache.flink.table.data.RowData`. The framework provides runtime converters such
-that a sink can still work on common data structures and perform a conversion at the beginning.
+实现 `DynamicTableSink` 接口的类必须能够处理 Flink 内部数据结构,因此每条记录都会按照 `org.apache.flink.table.data.RowData` 的方式进行处理。Flink 运行时提供了转换机制来保证在最开始进行数据类型转换,以便 sink 端可以处理常见的数据结构。
 
-#### Sink Abilities
+<a name="sink-abilities"></a>
+
+#### sink 端的功能接口
 
 <table class="table table-bordered">
     <thead>
         <tr>
-        <th class="text-left" style="width: 25%">Interface</th>
-        <th class="text-center" style="width: 75%">Description</th>
+        <th class="text-left" style="width: 25%">接口</th>
+        <th class="text-center" style="width: 75%">描述</th>
         </tr>
     </thead>
     <tbody>
     <tr>
         <td>{{< gh_link file="flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsOverwrite.java" name="SupportsOverwrite" >}}</td>
-        <td>Enables to overwrite existing data in a <code>DynamicTableSink</code>. By default, if
-        this interface is not implemented, existing tables or partitions cannot be overwritten using
-        e.g. the SQL <code>INSERT OVERWRITE</code> clause.</td>
+        <td>支持 <code>DynamicTableSink</code> 覆盖写入已存在的数据。默认情况下,如果不实现这个接口,在使用 <code>INSERT OVERWRITE</code> SQL 语法时,已存在的表或分区不会被覆盖写入</td>
     </tr>
     <tr>
         <td>{{< gh_link file="flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsPartitioning.java" name="SupportsPartitioning" >}}</td>
-        <td>Enables to write partitioned data in a <code>DynamicTableSink</code>.</td>
+        <td>支持 <code>DynamicTableSink</code> 写入元数据列。</td>
     </tr>
     <tr>
         <td>{{< gh_link file="flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsWritingMetadata.java" name="SupportsWritingMetadata" >}}</td>
-        <td>Enables to write metadata columns into a <code>DynamicTableSource</code>. A table sink is
-        responsible for accepting requested metadata columns at the end of consumed rows and persist
-        them. This includes potentially forwarding metadata columns to contained formats.</td>
+        <td>支持 <code>DynamicTableSource</code> 写入元数据列。sink 端会在消费数据行时,在最后接受相应的元数据信息并进行持久化,其中包括元数据的格式信息。</td>
     </tr>
     </tbody>
 </table>
 
-### Encoding / Decoding Formats
+<a name="encoding--decoding-formats"></a>
+
+### 编码与解码
 
-Some table connectors accept different formats that encode and decode keys and/or values.
+有的表连接器支持 K/V 型数据的各类编码与解码方式。
 
-Formats work similar to the pattern `DynamicTableSourceFactory -> DynamicTableSource -> ScanRuntimeProvider`,
-where the factory is responsible for translating options and the source is responsible for creating runtime logic.
+编码与解码格式器的工作原理类似于 `DynamicTableSourceFactory -> DynamicTableSource -> ScanRuntimeProvider`,其中工厂类负责传参,source 负责提供处理逻辑。
 
-Because formats might be located in different modules, they are discovered using Java's Service Provider
-Interface similar to [table factories](#dynamic-table-factories). In order to discover a format factory,
-the dynamic table factory searches for a factory that corresponds to a factory identifier and connector-specific
-base class.
+由于编码与解码格式器处于不同的代码模块,类似于[table factories](#dynamic-table-factories),它们都需要通过 Java 的 SPI 机制自动识别。为了找到格式器的工厂类,动态表工厂类会根据该格式器工厂类的”标识符“来搜索,并确认其实现了连接器相关的基类。
 
-For example, the Kafka table source requires a `DeserializationSchema` as runtime interface for a decoding
-format. Therefore, the Kafka table source factory uses the value of the `value.format` option to discover
-a `DeserializationFormatFactory`.
+比如,Kafka 的 source 端需要一个实现了 `DeserializationSchema` 接口的类,用来为数据解码。那么 Kafka 的 source 端工厂类会使用配置项 `value.format` 的值来发现 `DeserializationFormatFactory`。
 
-The following format factories are currently supported:
+目前,支持使用如下格式器工厂类:
 
 ```
 org.apache.flink.table.factories.DeserializationFormatFactory
 org.apache.flink.table.factories.SerializationFormatFactory
 ```
 
-The format factory translates the options into an `EncodingFormat` or a `DecodingFormat`. Those interfaces are
-another kind of factory that produce specialized format runtime logic for the given data type.
+格式器工厂类再将配置传参给 `EncodingFormat` 或 `DecodingFormat`。这些接口是另外一种工厂类,用于为所给的数据类型生成指定的格式器。
 
-For example, for a Kafka table source factory, the `DeserializationFormatFactory` would return an `EncodingFormat<DeserializationSchema>`
-that can be passed into the Kafka table source.
+例如 Kafka 的 source 端工厂类 `DeserializationFormatFactory` 会为 Kafka 的 source 端返回 `EncodingFormat<DeserializationSchema>`
 
 {{< top >}}
 
-Full Stack Example
+<a name="full-stack-example"></a>
+
+全栈实例
 ------------------
 
-This section sketches how to implement a scan table source with a decoding format that supports changelog
-semantics. The example illustrates how all of the mentioned components play together. It can serve as
-a reference implementation.
+这一部分介绍如何实现一个 CDC 场景下 scan table 类型的 source 端,同时支持自定义解码器下面的例子中使用了上面提到的所有内容,作为一个参考。
 
-In particular, it shows how to
-- create factories that parse and validate options,
-- implement table connectors,
-- implement and discover custom formats,
-- and use provided utilities such as data structure converters and the `FactoryUtil`.
+这个例子展示了:
+- 创建工厂类实现配置项的解析与校验
+- 实现表连接器
+- 实现与发现自定义的编码/解码格式器
+- 其他工具类,数据结构的转换器以及一个`FactoryUtil`类。
 
-The table source uses a simple single-threaded `SourceFunction` to open a socket that listens for incoming
-bytes. The raw bytes are decoded into rows by a pluggable format. The format expects a changelog flag
-as the first column.
+source 端通过实现一个单线程的 `SourceFunction` 接口,绑定一个 socket 端口来监听字节流字节流会被解码为一行一行的数据,解码器是可插拔的。解码方式是将第一列数据作为这条数据的操作类型。
 
-We will use most of the interfaces mentioned above to enable the following DDL:
+我们使用上文提到的一系列接口来构建这个例子,并通过如下这个 DDL 来创建表:
 
 ```sql
 CREATE TABLE UserScores (name STRING, score INT)
@@ -382,14 +349,13 @@ WITH (
 );
 ```
 
-Because the format supports changelog semantics, we are able to ingest updates during runtime and create
-an updating view that can continuously evaluate changing data:
+由于解码时支持 CDC 语义,我们可以在运行时更新数据并且创建一个更新视图,并源源不断地处理变更数据:
 
 ```sql
 SELECT name, SUM(score) FROM UserScores GROUP BY name;
 ```
 
-Use the following command to ingest data in a terminal:
+在命令行中输入如下指令,并输入数据:
 ```text
 > nc -lk 9999
 INSERT|Alice|12
@@ -398,16 +364,17 @@ DELETE|Alice|12
 INSERT|Alice|18
 ```
 
-### Factories
+<a name="factories"></a>
+
+### 工厂类
 
-This section illustrates how to translate metadata coming from the catalog to concrete connector instances.
+这一部分主要介绍如何从 catalog 中解析元数据信息来构建表连接器的实例。
 
-Both factories have been added to the `META-INF/services` directory.
+下面的工厂类都以添加到 `META-INF/services` 目录下。
 
 **`SocketDynamicTableFactory`**
 
-The `SocketDynamicTableFactory` translates the catalog table to a table source. Because the table source
-requires a decoding format, we are discovering the format using the provided `FactoryUtil` for convenience.
+`SocketDynamicTableFactory` 根据 catalog 表信息,生成表的 source 端。由于 source 端需要进行对数据解码,我们通过 `FactoryUtil` 类来找到解码器。
 
 ```java
 import org.apache.flink.api.common.serialization.DeserializationSchema;
@@ -424,7 +391,7 @@ import org.apache.flink.table.types.DataType;
 
 public class SocketDynamicTableFactory implements DynamicTableSourceFactory {
 
-  // define all options statically
+  // 定义所有配置项
   public static final ConfigOption<String> HOSTNAME = ConfigOptions.key("hostname")
     .stringType()
     .noDefaultValue();
@@ -435,11 +402,11 @@ public class SocketDynamicTableFactory implements DynamicTableSourceFactory {
 
   public static final ConfigOption<Integer> BYTE_DELIMITER = ConfigOptions.key("byte-delimiter")
     .intType()
-    .defaultValue(10); // corresponds to '\n'
+    .defaultValue(10); // 等同于 '\n'
 
   @Override
   public String factoryIdentifier() {
-    return "socket"; // used for matching to `connector = '...'`
+    return "socket"; // 用于匹配: `connector = '...'`
   }
 
   @Override
@@ -447,7 +414,7 @@ public class SocketDynamicTableFactory implements DynamicTableSourceFactory {
     final Set<ConfigOption<?>> options = new HashSet<>();
     options.add(HOSTNAME);
     options.add(PORT);
-    options.add(FactoryUtil.FORMAT); // use pre-defined option for format
+    options.add(FactoryUtil.FORMAT); // 解码的格式器使用预先定义的配置项
     return options;
   }
 
@@ -460,29 +427,28 @@ public class SocketDynamicTableFactory implements DynamicTableSourceFactory {
 
   @Override
   public DynamicTableSource createDynamicTableSource(Context context) {
-    // either implement your custom validation logic here ...
-    // or use the provided helper utility
+    // 使用提供的工具类或实现你自己的逻辑进行校验
     final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
 
-    // discover a suitable decoding format
+    // 找到合适的解码器
     final DecodingFormat<DeserializationSchema<RowData>> decodingFormat = helper.discoverDecodingFormat(
       DeserializationFormatFactory.class,
       FactoryUtil.FORMAT);
 
-    // validate all options
+    // 校验所有的配置项
     helper.validate();
 
-    // get the validated options
+    // 获取校验完的配置项
     final ReadableConfig options = helper.getOptions();
     final String hostname = options.get(HOSTNAME);
     final int port = options.get(PORT);
     final byte byteDelimiter = (byte) (int) options.get(BYTE_DELIMITER);
 
-    // derive the produced data type (excluding computed columns) from the catalog table
+    // 从 catalog 中抽取要生产的数据类型 (除了需要计算的列) 
     final DataType producedDataType =
             context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType();
 
-    // create and return dynamic table source
+    // 创建并返回动态表 source
     return new SocketDynamicTableSource(hostname, port, byteDelimiter, decodingFormat, producedDataType);
   }
 }
@@ -490,11 +456,9 @@ public class SocketDynamicTableFactory implements DynamicTableSourceFactory {
 
 **`ChangelogCsvFormatFactory`**
 
-The `ChangelogCsvFormatFactory` translates format-specific options to a format. The `FactoryUtil` in `SocketDynamicTableFactory`
-takes care of adapting the option keys accordingly and handles the prefixing like `changelog-csv.column-delimiter`.
+`ChangelogCsvFormatFactory` 根据解码器相关的配置构建解码器。`SocketDynamicTableFactory` 中的 `FactoryUtil` 会适配好配置项中的键,并处理 `changelog-csv.column-delimiter` 这样带有前缀的键。
 
-Because this factory implements `DeserializationFormatFactory`, it could also be used for other connectors
-that support deserialization formats such as the Kafka connector.
+由于这个工厂类实现了 `DeserializationFormatFactory` 接口,它也可以为其他连接器(比如 Kafka 连接器)提供反序列化的解码支持。
 
 ```java
 import org.apache.flink.api.common.serialization.DeserializationSchema;
@@ -509,7 +473,7 @@ import org.apache.flink.table.factories.DynamicTableFactory;
 
 public class ChangelogCsvFormatFactory implements DeserializationFormatFactory {
 
-  // define all options statically
+  // 定义所有配置项
   public static final ConfigOption<String> COLUMN_DELIMITER = ConfigOptions.key("column-delimiter")
     .stringType()
     .defaultValue("|");
@@ -535,30 +499,27 @@ public class ChangelogCsvFormatFactory implements DeserializationFormatFactory {
   public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
       DynamicTableFactory.Context context,
       ReadableConfig formatOptions) {
-    // either implement your custom validation logic here ...
-    // or use the provided helper method
+    // 使用提供的工具类或实现你自己的逻辑进行校验
     FactoryUtil.validateFactoryOptions(this, formatOptions);
 
-    // get the validated options
+    // 获取校验完的配置项
     final String columnDelimiter = formatOptions.get(COLUMN_DELIMITER);
 
-    // create and return the format
+    // 创建并返回解码器
     return new ChangelogCsvFormat(columnDelimiter);
   }
 }
 ```
 
-### Table Source and Decoding Format
+<a name="table-source-and-decoding-format"></a>
 
-This section illustrates how to translate from instances of the planning layer to runtime instances that
-are shipped to the cluster.
+### source 端与解码
+
+这部分主要介绍在计划阶段的 source 与 解码器实例,是如何转化为运行时实例,以便于提交给集群。
 
 **`SocketDynamicTableSource`**
 
-The `SocketDynamicTableSource` is used during planning. In our example, we don't implement any of the
-available ability interfaces. Therefore, the main logic can be found in `getScanRuntimeProvider(...)`
-where we instantiate the required `SourceFunction` and its `DeserializationSchema` for runtime. Both
-instances are parameterized to return internal data structures (i.e. `RowData`).
+`SocketDynamicTableSource` 在计划阶段中会被用到。在我们的例子中,我们不会实现任何功能接口,因此,`getScanRuntimeProvider(...)` 方法中就是主要逻辑:对 `SourceFunction` 以及其用到的 `DeserializationSchema` 进行实例化,作为运行时的实例。两个实例都被参数化来返回内部数据结构(比如 `RowData`)。
 
 ```java
 import org.apache.flink.api.common.serialization.DeserializationSchema;
@@ -594,15 +555,15 @@ public class SocketDynamicTableSource implements ScanTableSource {
 
   @Override
   public ChangelogMode getChangelogMode() {
-    // in our example the format decides about the changelog mode
-    // but it could also be the source itself
+    // 在我们的例子中,由解码器来决定 changelog 支持的模式
+    // 但是在 source 端指定也可以
     return decodingFormat.getChangelogMode();
   }
 
   @Override
   public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
 
-    // create runtime classes that are shipped to the cluster
+    // 创建运行时类用于提交给集群
 
     final DeserializationSchema<RowData> deserializer = decodingFormat.createRuntimeDecoder(
       runtimeProviderContext,
@@ -631,8 +592,7 @@ public class SocketDynamicTableSource implements ScanTableSource {
 
 **`ChangelogCsvFormat`**
 
-The `ChangelogCsvFormat` is a decoding format that uses a `DeserializationSchema` during runtime. It
-supports emitting `INSERT` and `DELETE` changes.
+`ChangelogCsvFormat` 在运行时使用 `DeserializationSchema` 为数据进行解码,这里支持处理 `INSERT`、`DELETE` 变更类型的数据。
 
 ```java
 import org.apache.flink.api.common.serialization.DeserializationSchema;
@@ -659,24 +619,24 @@ public class ChangelogCsvFormat implements DecodingFormat<DeserializationSchema<
   public DeserializationSchema<RowData> createRuntimeDecoder(
       DynamicTableSource.Context context,
       DataType producedDataType) {
-    // create type information for the DeserializationSchema
+    // 为 DeserializationSchema 创建类型信息
     final TypeInformation<RowData> producedTypeInfo = (TypeInformation<RowData>) context.createTypeInformation(
       producedDataType);
 
-    // most of the code in DeserializationSchema will not work on internal data structures
-    // create a converter for conversion at the end
+    // DeserializationSchema 中的大多数代码无法处理内部数据结构
+    // 在最后为转换创建一个转换器
     final DataStructureConverter converter = context.createDataStructureConverter(producedDataType);
 
-    // use logical types during runtime for parsing
+    // 在运行时,为解析过程提供逻辑类型
     final List<LogicalType> parsingTypes = producedDataType.getLogicalType().getChildren();
 
-    // create runtime class
+    // 创建运行时类
     return new ChangelogCsvDeserializer(parsingTypes, converter, producedTypeInfo, columnDelimiter);
   }
 
   @Override
   public ChangelogMode getChangelogMode() {
-    // define that this format can produce INSERT and DELETE rows
+    // 支持处理 `INSERT`、`DELETE` 变更类型的数据。
     return ChangelogMode.newBuilder()
       .addContainedKind(RowKind.INSERT)
       .addContainedKind(RowKind.DELETE)
@@ -685,14 +645,15 @@ public class ChangelogCsvFormat implements DecodingFormat<DeserializationSchema<
 }
 ```
 
-### Runtime
+<a name="runtime-1"></a>
+
+### 运行时
 
-For completeness, this section illustrates the runtime logic for both `SourceFunction` and `DeserializationSchema`.
+为了让例子更完整,这部分主要介绍 `SourceFunction` 和 `DeserializationSchema` 的运行逻辑。
 
 **ChangelogCsvDeserializer**
 
-The `ChangelogCsvDeserializer` contains a simple parsing logic for converting bytes into `Row` of `Integer`
-and `String` with a row kind. The final conversion step converts those into internal data structures.
+`ChangelogCsvDeserializer` 的解析逻辑比较简单:将字节流数据解析为由 `Integer` 和 `String` 组成的 `Row` 类型,并附带这条数据的操作类型,最后将其转换为内部数据结构。
 
 ```java
 import org.apache.flink.api.common.serialization.DeserializationSchema;
@@ -725,26 +686,26 @@ public class ChangelogCsvDeserializer implements DeserializationSchema<RowData>
 
   @Override
   public TypeInformation<RowData> getProducedType() {
-    // return the type information required by Flink's core interfaces
+    // 为 Flink 的核心接口提供类型信息。
     return producedTypeInfo;
   }
 
   @Override
   public void open(InitializationContext context) {
-    // converters must be open
+    // 转化器必须要被开启。
     converter.open(Context.create(ChangelogCsvDeserializer.class.getClassLoader()));
   }
 
   @Override
   public RowData deserialize(byte[] message) {
-    // parse the columns including a changelog flag
+    // 按列解析数据,其中一列是 changelog 标记
     final String[] columns = new String(message).split(Pattern.quote(columnDelimiter));
     final RowKind kind = RowKind.valueOf(columns[0]);
     final Row row = new Row(kind, parsingTypes.size());
     for (int i = 0; i < parsingTypes.size(); i++) {
       row.setField(i, parse(parsingTypes.get(i).getTypeRoot(), columns[i + 1]));
     }
-    // convert to internal data structure
+    // 转换为内部数据结构
     return (RowData) converter.toInternal(row);
   }
 
@@ -768,9 +729,7 @@ public class ChangelogCsvDeserializer implements DeserializationSchema<RowData>
 
 **SocketSourceFunction**
 
-The `SocketSourceFunction` opens a socket and consumes bytes. It splits records by the given byte
-delimiter (`\n` by default) and delegates the decoding to a pluggable `DeserializationSchema`. The
-source function can only work with a parallelism of 1.
+`SocketSourceFunction` 会监听一个 socket 端口并持续消费字节流。它会按照给定的分隔符拆分每条记录,并由插件化的 `DeserializationSchema` 进行解码。目前这个 source 功能只支持平行度为 1。
 
 ```java
 import org.apache.flink.api.common.serialization.DeserializationSchema;
@@ -810,7 +769,7 @@ public class SocketSourceFunction extends RichSourceFunction<RowData> implements
   @Override
   public void run(SourceContext<RowData> ctx) throws Exception {
     while (isRunning) {
-      // open and consume from socket
+      // 持续从 socket 消费数据
       try (final Socket socket = new Socket()) {
         currentSocket = socket;
         socket.connect(new InetSocketAddress(hostname, port), 0);
@@ -818,11 +777,11 @@ public class SocketSourceFunction extends RichSourceFunction<RowData> implements
           ByteArrayOutputStream buffer = new ByteArrayOutputStream();
           int b;
           while ((b = stream.read()) >= 0) {
-            // buffer until delimiter
+            // 持续写入 buffer 直到遇到分隔符
             if (b != byteDelimiter) {
               buffer.write(b);
             }
-            // decode and emit record
+            // 解码并处理记录
             else {
               ctx.collect(deserializer.deserialize(buffer.toByteArray()));
               buffer.reset();
@@ -830,7 +789,7 @@ public class SocketSourceFunction extends RichSourceFunction<RowData> implements
           }
         }
       } catch (Throwable t) {
-        t.printStackTrace(); // print and continue
+        t.printStackTrace(); // 打印并继续
       }
       Thread.sleep(1000);
     }
@@ -842,7 +801,7 @@ public class SocketSourceFunction extends RichSourceFunction<RowData> implements
     try {
       currentSocket.close();
     } catch (Throwable t) {
-      // ignore
+      // 忽略
     }
   }
 }