You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/02/07 15:25:56 UTC

flink git commit: [FLINK-5702] [doc] At-least-once configuration info for FlinkKafkaProducer

Repository: flink
Updated Branches:
  refs/heads/master b5caaef82 -> cba85db64


[FLINK-5702] [doc] At-least-once configuration info for FlinkKafkaProducer

This closes #3282.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cba85db6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cba85db6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cba85db6

Branch: refs/heads/master
Commit: cba85db64bf88fb107154f0f869d708c3d6fca24
Parents: b5caaef
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Tue Feb 7 14:32:28 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue Feb 7 23:24:57 2017 +0800

----------------------------------------------------------------------
 docs/dev/connectors/kafka.md | 138 ++++++++++++++++++++++++++++----------
 1 file changed, 103 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cba85db6/docs/dev/connectors/kafka.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md
index 6a58b7a..a727f85 100644
--- a/docs/dev/connectors/kafka.md
+++ b/docs/dev/connectors/kafka.md
@@ -23,6 +23,9 @@ specific language governing permissions and limitations
 under the License.
 -->
 
+* This will be replaced by the TOC
+{:toc}
+
 This connector provides access to event streams served by [Apache Kafka](https://kafka.apache.org/).
 
 Flink provides special Kafka Connectors for reading and writing data from/to Kafka topics.
@@ -84,14 +87,14 @@ Then, import the connector in your maven project:
 
 Note that the streaming connectors are currently not part of the binary distribution. See how to link with them for cluster execution [here]({{ site.baseurl}}/dev/linking.html).
 
-### Installing Apache Kafka
+## Installing Apache Kafka
 
 * Follow the instructions from [Kafka's quickstart](https://kafka.apache.org/documentation.html#quickstart) to download the code and launch a server (launching a Zookeeper and a Kafka server is required every time before starting the application).
 * If the Kafka and Zookeeper servers are running on a remote machine, then the `advertised.host.name` setting in the `config/server.properties` file must be set to the machine's IP address.
 
-### Kafka Consumer
+## Kafka Consumer
 
-Flink's Kafka consumer is called `FlinkKafkaConsumer08` (or `09` for Kafka 0.9.0.x versions). It provides access to one or more Kafka topics.
+Flink's Kafka consumer is called `FlinkKafkaConsumer08` (or `09` for Kafka 0.9.0.x versions, etc.). It provides access to one or more Kafka topics.
 
 The constructor accepts the following arguments:
 
@@ -137,7 +140,7 @@ for querying the list of topics and partitions.
 For this to work, the consumer needs to be able to access the consumers from the machine submitting the job to the Flink cluster.
 If you experience any issues with the Kafka consumer on the client side, the client log might contain information about failed requests, etc.
 
-#### The `DeserializationSchema`
+### The `DeserializationSchema`
 
 The Flink Kafka Consumer needs to know how to turn the binary data in Kafka into Java/Scala objects. The
 `DeserializationSchema` allows users to specify such a schema. The `T deserialize(byte[] message)`
@@ -161,7 +164,7 @@ For convenience, Flink provides the following schemas:
     The KeyValue objectNode contains a "key" and "value" field which contain all fields, as well as
     an optional "metadata" field that exposes the offset/partition/topic for this message.
 
-#### Kafka Consumers and Fault Tolerance
+### Kafka Consumers and Fault Tolerance
 
 With Flink's checkpointing enabled, the Flink Kafka Consumer will consume records from a topic and periodically checkpoint all
 its Kafka offsets, together with the state of other operations, in a consistent manner. In case of a job failure, Flink will restore
@@ -193,7 +196,7 @@ Flink on YARN supports automatic restart of lost YARN containers.
 
 If checkpointing is not enabled, the Kafka consumer will periodically commit the offsets to Zookeeper.
 
-#### Kafka Consumers and Timestamp Extraction/Watermark Emission
+### Kafka Consumers and Timestamp Extraction/Watermark Emission
 
 In many scenarios, the timestamp of a record is embedded (explicitly or implicitly) in the record itself.
 In addition, the user may want to emit watermarks either periodically, or in an irregular fashion, e.g. based on
@@ -248,59 +251,124 @@ the `Watermark getCurrentWatermark()` (for periodic) or the
 if a new watermark should be emitted and with which timestamp.
 
 
-### Kafka Producer
+## Kafka Producer
 
-The `FlinkKafkaProducer08` writes data to a Kafka topic. The producer can specify a custom partitioner that assigns
-records to partitions.
+Flink\u2019s Kafka Producer is called `FlinkKafkaProducer08` (or `09` for Kafka 0.9.0.x versions, etc.).
+It allows writing a stream of records to one or more Kafka topics.
 
 Example:
 
-
 <div class="codetabs" markdown="1">
 <div data-lang="java, Kafka 0.8+" markdown="1">
 {% highlight java %}
-stream.addSink(new FlinkKafkaProducer08<String>("localhost:9092", "my-topic", new SimpleStringSchema()));
+DataStream<String> stream = ...;
+
+FlinkKafkaProducer08<String> myProducer = new FlinkKafkaProducer08<String>(
+        "localhost:9092",            // broker list
+        "my-topic",                  // target topic
+        new SimpleStringSchema());   // serialization schema
+
+// the following is necessary for at-least-once delivery guarantee
+myProducer.setLogFailuresOnly(false);   // "false" by default
+myProducer.setFlushOnCheckpoint(true);  // "false" by default
+
+stream.addSink(myProducer);
 {% endhighlight %}
 </div>
 <div data-lang="java, Kafka 0.10+" markdown="1">
 {% highlight java %}
-FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, "my-topic", new SimpleStringSchema(), properties);
+DataStream<String> stream = ...;
+
+FlinkKafkaProducer010Configuration myProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps(
+        stream,                     // input stream
+        "my-topic",                 // target topic
+        new SimpleStringSchema(),   // serialization schema
+        properties);                // custom configuration for KafkaProducer (including broker list)
+
+// the following is necessary for at-least-once delivery guarantee
+myProducerConfig.setLogFailuresOnly(false);   // "false" by default
+myProducerConfig.setFlushOnCheckpoint(true);  // "false" by default
 {% endhighlight %}
 </div>
 <div data-lang="scala, Kafka 0.8+" markdown="1">
 {% highlight scala %}
-stream.addSink(new FlinkKafkaProducer08[String]("localhost:9092", "my-topic", new SimpleStringSchema()))
+val stream: DataStream[String] = ...
+
+val myProducer = new FlinkKafkaProducer08[String](
+        "localhost:9092",         // broker list
+        "my-topic",               // target topic
+        new SimpleStringSchema)   // serialization schema
+
+// the following is necessary for at-least-once delivery guarantee
+myProducer.setLogFailuresOnly(false)   // "false" by default
+myProducer.setFlushOnCheckpoint(true)  // "false" by default
+
+stream.addSink(myProducer)
 {% endhighlight %}
 </div>
 <div data-lang="scala, Kafka 0.10+" markdown="1">
 {% highlight scala %}
-FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, "my-topic", new SimpleStringSchema(), properties);
+val stream: DataStream[String] = ...
+
+val myProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps(
+        stream,                   // input stream
+        "my-topic",               // target topic
+        new SimpleStringSchema,   // serialization schema
+        properties)               // custom configuration for KafkaProducer (including broker list)
+
+// the following is necessary for at-least-once delivery guarantee
+myProducerConfig.setLogFailuresOnly(false)   // "false" by default
+myProducerConfig.setFlushOnCheckpoint(true)  // "false" by default
 {% endhighlight %}
 </div>
 </div>
 
-You can also define a custom Kafka producer configuration for the KafkaSink with the constructor. Please refer to
-the [Apache Kafka documentation](https://kafka.apache.org/documentation.html) for details on how to configure
-Kafka Producers.
-
-Similar to the consumer, the producer also allows using an advanced serialization schema which allows
-serializing the key and value separately. It also allows to override the target topic id, so that
-one producer instance can send data to multiple topics.
-
-The interface of the serialization schema is called `KeyedSerializationSchema`.
-
-
-**Note**: By default, the number of retries is set to "0". This means that the producer fails immediately on errors,
-including leader changes. The value is set to "0" by default to avoid duplicate messages in the target topic.
-For most production environments with frequent broker changes, we recommend setting the number of retries to a
-higher value.
-
-There is currently no transactional producer for Kafka, so Flink can not guarantee exactly-once delivery
+The above examples demonstrate the basic usage of creating a Flink Kafka Producer
+to write streams to a single Kafka target topic. For more advanced usages, there
+are other constructor variants that allow providing the following:
+
+ * *Providing custom properties*:
+ The producer allows providing a custom properties configuration for the internal `KafkaProducer`.
+ Please refer to the [Apache Kafka documentation](https://kafka.apache.org/documentation.html) for
+ details on how to configure Kafka Producers.
+ * *Custom partitioner*: To assign records to specific
+ partitions, you can provide an implementation of a `KafkaPartitioner` to the
+ constructor. This partitioner will be called for each record in the stream
+ to determine which exact partition the record will be sent to.
+ * *Advanced serialization schema*: Similar to the consumer,
+ the producer also allows using an advanced serialization schema called `KeyedSerializationSchema`,
+ which allows serializing the key and value separately. It also allows to override the target topic,
+ so that one producer instance can send data to multiple topics.
+ 
+### Kafka Producers and Fault Tolerance
+
+With Flink's checkpointing enabled, the Flink Kafka Producer can provide
+at-least-once delivery guarantees.
+
+Besides enabling Flink's checkpointing, you should also configure the setter
+methods `setLogFailuresOnly(boolean)` and `setFlushOnCheckpoint(boolean)` appropriately,
+as shown in the above examples in the previous section:
+
+ * `setLogFailuresOnly(boolean)`: enabling this will let the producer log failures only
+ instead of catching and rethrowing them. This essentially accounts the record
+ to have succeeded, even if it was never written to the target Kafka topic. This
+ must be disabled for at-least-once.
+ * `setFlushOnCheckpoint(boolean)`: with this enabled, Flink's checkpoints will wait for any
+ on-the-fly records at the time of the checkpoint to be acknowledged by Kafka before
+ succeeding the checkpoint. This ensures that all records before the checkpoint have
+ been written to Kafka. This must be enabled for at-least-once.
+
+**Note**: By default, the number of retries is set to "0". This means that when `setLogFailuresOnly` is set to `false`,
+the producer fails immediately on errors, including leader changes. The value is set to "0" by default to avoid
+duplicate messages in the target topic that are caused by retries. For most production environments with frequent broker changes,
+we recommend setting the number of retries to a higher value.
+
+**Note**: There is currently no transactional producer for Kafka, so Flink can not guarantee exactly-once delivery
 into a Kafka topic.
 
-### Using Kafka timestamps and Flink event time in Kafka 0.10
+## Using Kafka timestamps and Flink event time in Kafka 0.10
 
-Since Apache Kafka 0.10., Kafka's messages can carry [timestamps](https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message), indicating
+Since Apache Kafka 0.10+, Kafka's messages can carry [timestamps](https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message), indicating
 the time the event has occurred (see ["event time" in Apache Flink](../event_time.html)) or the time when the message
 has been written to the Kafka broker.
 
@@ -331,7 +399,7 @@ config.setWriteTimestampToKafka(true);
 
 
 
-### Kafka Connector metrics
+## Kafka Connector metrics
 
 Flink's Kafka connectors provide some metrics through Flink's [metrics system]({{ site.baseurl }}/monitoring/metrics.html) to analyze
 the behavior of the connector.
@@ -354,7 +422,7 @@ the committed offset and the most recent offset in each partition is called the
 the data slower from the topic than new data is added, the lag will increase and the consumer will fall behind.
 For large production deployments we recommend monitoring that metric to avoid increasing latency.
 
-### Enabling Kerberos Authentication (for versions 0.9+ and above only)
+## Enabling Kerberos Authentication (for versions 0.9+ and above only)
 
 Flink provides first-class support through the Kafka connector to authenticate to a Kafka installation
 configured for Kerberos. Simply configure Flink in `flink-conf.yaml` to enable Kerberos authentication for Kafka like so: