You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2019/11/01 12:56:32 UTC

[GitHub] [flink] wuchong commented on a change in pull request #9764: [FLINK-12939][docs-zh] Translate "Apache Kafka Connector" page into Chinese

wuchong commented on a change in pull request #9764: [FLINK-12939][docs-zh] Translate "Apache Kafka Connector" page into Chinese
URL: https://github.com/apache/flink/pull/9764#discussion_r341560787
 
 

 ##########
 File path: docs/dev/connectors/kafka.zh.md
 ##########
 @@ -577,171 +474,101 @@ stream.addSink(myProducer);
 val stream: DataStream[String] = ...
 
 val myProducer = new FlinkKafkaProducer011[String](
-        "localhost:9092",         // broker list
-        "my-topic",               // target topic
-        new SimpleStringSchema)   // serialization schema
+        "localhost:9092",         // broker 列表
+        "my-topic",               // 目标 topic
+        new SimpleStringSchema)   // 序列化 schema
 
-// versions 0.10+ allow attaching the records' event timestamp when writing them to Kafka;
-// this method is not available for earlier Kafka versions
+// 0.10+ 版本的 Kafka 允许在将记录写入 Kafka 时附加记录的事件时间戳;
+// 此方法不适用于早期版本的 Kafka
 myProducer.setWriteTimestampToKafka(true)
 
 stream.addSink(myProducer)
 {% endhighlight %}
 </div>
 </div>
 
-The above examples demonstrate the basic usage of creating a Flink Kafka Producer
-to write streams to a single Kafka target topic. For more advanced usages, there
-are other constructor variants that allow providing the following:
-
- * *Providing custom properties*:
- The producer allows providing a custom properties configuration for the internal `KafkaProducer`.
- Please refer to the [Apache Kafka documentation](https://kafka.apache.org/documentation.html) for
- details on how to configure Kafka Producers.
- * *Custom partitioner*: To assign records to specific
- partitions, you can provide an implementation of a `FlinkKafkaPartitioner` to the
- constructor. This partitioner will be called for each record in the stream
- to determine which exact partition of the target topic the record should be sent to.
- Please see [Kafka Producer Partitioning Scheme](#kafka-producer-partitioning-scheme) for more details.
- * *Advanced serialization schema*: Similar to the consumer,
- the producer also allows using an advanced serialization schema called `KeyedSerializationSchema`,
- which allows serializing the key and value separately. It also allows to override the target topic,
- so that one producer instance can send data to multiple topics.
-
-### Kafka Producer Partitioning Scheme
-
-By default, if a custom partitioner is not specified for the Flink Kafka Producer, the producer will use
-a `FlinkFixedPartitioner` that maps each Flink Kafka Producer parallel subtask to a single Kafka partition
-(i.e., all records received by a sink subtask will end up in the same Kafka partition).
-
-A custom partitioner can be implemented by extending the `FlinkKafkaPartitioner` class. All
-Kafka versions' constructors allow providing a custom partitioner when instantiating the producer.
-Note that the partitioner implementation must be serializable, as they will be transferred across Flink nodes.
-Also, keep in mind that any state in the partitioner will be lost on job failures since the partitioner
-is not part of the producer's checkpointed state.
-
-It is also possible to completely avoid using and kind of partitioner, and simply let Kafka partition
-the written records by their attached key (as determined for each record using the provided serialization schema).
-To do this, provide a `null` custom partitioner when instantiating the producer. It is important
-to provide `null` as the custom partitioner; as explained above, if a custom partitioner is not specified
-the `FlinkFixedPartitioner` is used instead.
-
-### Kafka Producers and Fault Tolerance
+上面的例子演示了创建 Flink Kafka Producer 来将流消息写入单个 Kafka 目标 topic 的基本用法。
+对于更高级的用法,这还有其他构造函数变体允许提供以下内容:
+
+ * *提供自定义属性*:producer 允许为内部 `KafkaProducer` 提供自定义属性配置。有关如何配置 Kafka Producer 的详细信息,请参阅  [Apache Kafka 文档](https://kafka.apache.org/documentation.html)。
+ * *自定义分区器*:要将消息分配给特定的分区,可以向构造函数提供一个 `FlinkKafkaPartitioner` 的实现。这个分区器将被流中的每条记录调用,以确定消息应该发送到目标 topic 的哪个具体分区里。有关详细信息,请参阅 [Kafka Producer 分区方案](#kafka-producer-partitioning-scheme)。
+ * *高级的序列化 schema*:与 consumer 类似,producer 还允许使用名为 `KeyedSerializationSchema` 的高级序列化 schema,该 schema 允许单独序列化 key 和 value。它还允许覆盖目标 topic,以便 producer 实例可以将数据发送到多个 topic。
+
+### Kafka Producer 分区方案
+
+默认情况下,如果没有为 Flink Kafka Producer 指定自定义分区程序,则 producer 将使用 `FlinkFixedPartitioner` 为每个 Flink Kafka Producer 并行子任务映射到单个 Kafka 分区(即,接收子任务接收到的所有消息都将位于同一个 Kafka 分区中)。
+
+可以通过扩展 `FlinkKafkaPartitioner` 类来实现自定义分区程序。所有 Kafka 版本的构造函数都允许在实例化 producer 时提供自定义分区程序。
+注意:分区器实现必须是可序列化的,因为它们将在 Flink 节点之间传输。此外,请记住分区器中的任何状态都将在作业失败时丢失,因为分区器不是 producer 的检查点状态的一部分。
+
+也可以完全避免使用分区器,并简单地让 Kafka 通过其附加 key 写入的消息进行分区(使用提供的序列化 schema 为每条记录确定分区)。
+为此,在实例化 producer 时提供 `null` 自定义分区程序,提供 `null` 作为自定义分区器是很重要的; 如上所述,如果未指定自定义分区程序,则默认使用 `FlinkFixedPartitioner`。
+
+### Kafka Producer 和容错
 
 #### Kafka 0.8
 
-Before 0.9 Kafka did not provide any mechanisms to guarantee at-least-once or exactly-once semantics.
+在 0.9 版本之前,Kafka 没有提供任何机制来保证至少一次或精准一次的语义。
 
 #### Kafka 0.9 and 0.10
 
-With Flink's checkpointing enabled, the `FlinkKafkaProducer09` and `FlinkKafkaProducer010`
-can provide at-least-once delivery guarantees.
-
-Besides enabling Flink's checkpointing, you should also configure the setter
-methods `setLogFailuresOnly(boolean)` and `setFlushOnCheckpoint(boolean)` appropriately.
-
- * `setLogFailuresOnly(boolean)`: by default, this is set to `false`.
- Enabling this will let the producer only log failures
- instead of catching and rethrowing them. This essentially accounts the record
- to have succeeded, even if it was never written to the target Kafka topic. This
- must be disabled for at-least-once.
- * `setFlushOnCheckpoint(boolean)`: by default, this is set to `true`.
- With this enabled, Flink's checkpoints will wait for any
- on-the-fly records at the time of the checkpoint to be acknowledged by Kafka before
- succeeding the checkpoint. This ensures that all records before the checkpoint have
- been written to Kafka. This must be enabled for at-least-once.
-
-In conclusion, the Kafka producer by default has at-least-once guarantees for versions
-0.9 and 0.10, with `setLogFailureOnly` set to `false` and `setFlushOnCheckpoint` set
-to `true`.
-
-**Note**: By default, the number of retries is set to "0". This means that when `setLogFailuresOnly` is set to `false`,
-the producer fails immediately on errors, including leader changes. The value is set to "0" by default to avoid
-duplicate messages in the target topic that are caused by retries. For most production environments with frequent broker changes,
-we recommend setting the number of retries to a higher value.
-
-**Note**: There is currently no transactional producer for Kafka, so Flink can not guarantee exactly-once delivery
-into a Kafka topic.
-
-#### Kafka 0.11 and newer
-
-With Flink's checkpointing enabled, the `FlinkKafkaProducer011` (`FlinkKafkaProducer` for Kafka >= 1.0.0 versions) can provide
-exactly-once delivery guarantees.
-
-Besides enabling Flink's checkpointing, you can also choose three different modes of operating
-chosen by passing appropriate `semantic` parameter to the `FlinkKafkaProducer011` (`FlinkKafkaProducer` for Kafka >= 1.0.0 versions):
-
- * `Semantic.NONE`: Flink will not guarantee anything. Produced records can be lost or they can
- be duplicated.
- * `Semantic.AT_LEAST_ONCE` (default setting): similar to `setFlushOnCheckpoint(true)` in
- `FlinkKafkaProducer010`. This guarantees that no records will be lost (although they can be duplicated).
- * `Semantic.EXACTLY_ONCE`: uses Kafka transactions to provide exactly-once semantic. Whenever you write
- to Kafka using transactions, do not forget about setting desired `isolation.level` (`read_committed`
- or `read_uncommitted` - the latter one is the default value) for any application consuming records
- from Kafka.
-
-##### Caveats
-
-`Semantic.EXACTLY_ONCE` mode relies on the ability to commit transactions
-that were started before taking a checkpoint, after recovering from the said checkpoint. If the time
-between Flink application crash and completed restart is larger than Kafka's transaction timeout
-there will be data loss (Kafka will automatically abort transactions that exceeded timeout time).
-Having this in mind, please configure your transaction timeout appropriately to your expected down
-times.
-
-Kafka brokers by default have `transaction.max.timeout.ms` set to 15 minutes. This property will
-not allow to set transaction timeouts for the producers larger than it's value.
-`FlinkKafkaProducer011` by default sets the `transaction.timeout.ms` property in producer config to
-1 hour, thus `transaction.max.timeout.ms` should be increased before using the
-`Semantic.EXACTLY_ONCE` mode.
-
-In `read_committed` mode of `KafkaConsumer`, any transactions that were not finished
-(neither aborted nor completed) will block all reads from the given Kafka topic past any
-un-finished transaction. In other words after following sequence of events:
-
-1. User started `transaction1` and written some records using it
-2. User started `transaction2` and written some further records using it
-3. User committed `transaction2`
-
-Even if records from `transaction2` are already committed, they will not be visible to
-the consumers until `transaction1` is committed or aborted. This has two implications:
-
- * First of all, during normal working of Flink applications, user can expect a delay in visibility
- of the records produced into Kafka topics, equal to average time between completed checkpoints.
- * Secondly in case of Flink application failure, topics into which this application was writing,
- will be blocked for the readers until the application restarts or the configured transaction 
- timeout time will pass. This remark only applies for the cases when there are multiple
- agents/applications writing to the same Kafka topic.
-
-**Note**:  `Semantic.EXACTLY_ONCE` mode uses a fixed size pool of KafkaProducers
-per each `FlinkKafkaProducer011` instance. One of each of those producers is used per one
-checkpoint. If the number of concurrent checkpoints exceeds the pool size, `FlinkKafkaProducer011`
-will throw an exception and will fail the whole application. Please configure max pool size and max
-number of concurrent checkpoints accordingly.
-
-**Note**: `Semantic.EXACTLY_ONCE` takes all possible measures to not leave any lingering transactions
-that would block the consumers from reading from Kafka topic more then it is necessary. However in the
-event of failure of Flink application before first checkpoint, after restarting such application there
-is no information in the system about previous pool sizes. Thus it is unsafe to scale down Flink
-application before first checkpoint completes, by factor larger than `FlinkKafkaProducer011.SAFE_SCALE_DOWN_FACTOR`.
-
-## Using Kafka timestamps and Flink event time in Kafka 0.10
-
-Since Apache Kafka 0.10+, Kafka's messages can carry
-[timestamps](https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message), indicating
-the time the event has occurred (see ["event time" in Apache Flink](../event_time.html)) or the time when the message
-has been written to the Kafka broker.
-
-The `FlinkKafkaConsumer010` will emit records with the timestamp attached, if the time characteristic in Flink is 
-set to `TimeCharacteristic.EventTime` (`StreamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)`).
-
-The Kafka consumer does not emit watermarks. To emit watermarks, the same mechanisms as described above in 
-"Kafka Consumers and Timestamp Extraction/Watermark Emission"  using the `assignTimestampsAndWatermarks` method are applicable.
-
-There is no need to define a timestamp extractor when using the timestamps from Kafka. The `previousElementTimestamp` argument of 
-the `extractTimestamp()` method contains the timestamp carried by the Kafka message.
-
-A timestamp extractor for a Kafka consumer would look like this:
+启用 Flink 的 checkpointing 后,`FlinkKafkaProducer09` 和 `FlinkKafkaProducer010` 可以提供至少一次的语义。
+
+除了启用 Flink 的 checkpointing 之外,还应该适当地配置 setter 方法,`setLogFailuresOnly(boolean)` 和 `setFlushOnCheckpoint(boolean)`。
+
+ * `setLogFailuresOnly(boolean)`:默认情况下,此值设置为 `false`。启用这个选项将使 producer 仅记录失败而不是捕获和重新抛出它们。这基本上是记录了成功的记录,即使它从未写入目标 Kafka topic。对 at-least-once 的语义,这个方法必须禁用。
+ * `setFlushOnCheckpoint(boolean)`:默认情况下,此值设置为 `true`。启用此功能后,Flink 的 checkpoint 将在 checkpoint 成功之前等待 Kafka 确认 checkpoint 时的任意即时记录。这样可确保 checkpoint 之前的所有记录都已写入 Kafka。对 at-least-once 的语义,这个方法必须启用。
+
+总之,默认情况下,Kafka producer 中,`setLogFailureOnly` 设置为 `false` 及  `setFlushOnCheckpoint` 设置为 `true`  会为 0.9 和 0.10 版本提供 at-least-once 语义。
+
+**注意**:默认情况下,重试次数设置为 0。这意味着当 `setLogFailuresOnly` 设置为 `false` 时,producer 会立即失败,包括 leader 更改。该值默认设置为 0,以避免重试导致目标 topic 中出现重复的消息。对于大多数频繁更改 broker 的生产环境,我们建议将重试次数设置为更高的值。
+
+**注意**:目前还没有 Kafka 的事务 producer,所以 Flink 不能保证写入 Kafka topic 的精准一次语义。
+
+#### Kafka 0.11 和更新的版本
+
+启用 Flink 的 checkpointing 后,`FlinkKafkaProducer011`(适用于 Kafka >= 1.0.0 版本的 `FlinkKafkaProducer`)可以提供精准一次的语义保证。
+
+除了启用 Flink 的 checkpointing,还可以通过将适当的 `semantic` 参数传递给 `FlinkKafkaProducer011`(适用于 Kafka >= 1.0.0 版本的 `FlinkKafkaProducer`)来选择三种不同的操作模式:
+
+ * `Semantic.NONE`:Flink 不会有任何语义的保证,产生的记录可能会丢失或重复。
+ * `Semantic.AT_LEAST_ONCE`(默认设置):类似 `FlinkKafkaProducer010` 中的 `setFlushOnCheckpoint(true)`,这可以保证不会丢失任何记录(虽然记录可能会重复)。
+ * `Semantic.EXACTLY_ONCE`:使用 Kafka 事务提供精准一次的语义。无论何时,在使用事务写入 Kafka 时,都要记得为所有消费 Kafka 消息的应用程序设置所需的 `isolation.level`( `read_committed` 或 `read_uncommitted`  - 后者是默认值)。
+
+##### 注意事项
+
+`Semantic.EXACTLY_ONCE` 模式依赖于事务提交的能力。事务提交发生于触发 checkpoint 之前,以及从 checkpoint 恢复之后。如果从 Flink 应用程序崩溃到完全重启的时间超过了 Kafka 的事务超时时间,那么将会有数据丢失(Kafka 会自动丢弃超出超时时间的事务)。考虑到这一点,请根据预期的宕机时间来合理地配置事务超时时间。
+
+默认情况下,Kafka broker 将 `transaction.max.timeout.ms` 设置为 15 分钟。此属性不允许为大于其值的 producer 设置事务超时时间。
+默认情况下,`FlinkKafkaProducer011` 将 producer config 中的 `transaction.timeout.ms` 属性设置为 1 小时,因此在使用 `Semantic.EXACTLY_ONCE` 模式之前应该增加 `transaction.max.timeout.ms` 的值。
+
+在 `KafkaConsumer` 的 `read_committed` 模式中,任何未完成的事务(既不中止也不完成)将阻止来自给定 Kafka  topic 的所有读取超过任何未完成的事务。
+换句话说,在遵循如下一系列事件之后:
+
+1. 用户启动了 `transaction1` 并使用它写了一些记录
+2. 用户启动了 `transaction2` 并使用它编写了一些其他记录
+3. 用户提交了 `transaction2`
+
+即使 `transaction2` 中的记录已提交,在提交或中止 `transaction1` 之前,消费者也不会看到这些记录。这有 2 层含义:
+
+ * 首先,在 Flink 应用程序的正常工作期间,用户可以预料 Kafka 主题中生成的记录的可见性会延迟,相当于已完成 checkpoint 之间的平均时间。
+ * 其次,在 Flink 应用程序失败的情况下,此应用程序正在写入的供消费者读取的主题将被阻塞,直到应用程序重新启动或配置的事务超时时间过去后,才恢复正常。此标注仅适用于有多个 agent 或者应用程序写入同一 Kafka 主题的情况。
+
+**注意**:`Semantic.EXACTLY_ONCE` 模式为每个 `FlinkKafkaProducer011` 实例使用固定大小的 KafkaProducer 池。每个 checkpoint 使用其中一个 producer。如果并发检查点的数量超过池的大小,`FlinkKafkaProducer011` 将抛出异常,并导致整个应用程序失败。请合理地配置最大池大小和最大并发检查点数量。
+
+**注意**:`Semantic.EXACTLY_ONCE` 采取一切可能的措施,不留下任何会阻止消费者读取 Kafka 主题消息的延迟事务,那么这是有必要的。但是,如果在第一个 checkpoint 之前 Flink 应用程序失败,则在重新启动此类应用程序后,系统中没有关于先前池大小的信息。因此,在第一个 checkpoint 完成之前缩小 Flink 应用程序是不安全的,因为大于了 `FlinkKafkaProducer011.SAFE_SCALE_DOWN_FACTOR` 的安全系数。
 
 Review comment:
   I will improve this a bit and merge.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services