You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2018/01/06 05:51:56 UTC
[03/11] flink git commit: [FLINK-8260] [kafka] Fix usage of
deprecated instantiation methods in FlinkKafkaProducer docs
[FLINK-8260] [kafka] Fix usage of deprecated instantiation methods in FlinkKafkaProducer docs
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b49ead38
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b49ead38
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b49ead38
Branch: refs/heads/master
Commit: b49ead387a30ff0d4f204e82fd8430012380eb77
Parents: 7b5fdbd
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Mon Dec 18 15:21:31 2017 -0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Jan 5 20:14:20 2018 -0800
----------------------------------------------------------------------
docs/dev/connectors/kafka.md | 46 ++++++++++-----------------------------
1 file changed, 11 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b49ead38/docs/dev/connectors/kafka.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md
index daf1903..c6461f9 100644
--- a/docs/dev/connectors/kafka.md
+++ b/docs/dev/connectors/kafka.md
@@ -447,17 +447,17 @@ if a new watermark should be emitted and with which timestamp.
## Kafka Producer
-Flink’s Kafka Producer is called `FlinkKafkaProducer08` (or `09` for Kafka 0.9.0.x versions, etc.).
+Flink’s Kafka Producer is called `FlinkKafkaProducer011` (or `010` for Kafka 0.10.0.x versions, etc.).
It allows writing a stream of records to one or more Kafka topics.
Example:
<div class="codetabs" markdown="1">
-<div data-lang="java, Kafka 0.8+" markdown="1">
+<div data-lang="java" markdown="1">
{% highlight java %}
DataStream<String> stream = ...;
-FlinkKafkaProducer08<String> myProducer = new FlinkKafkaProducer08<String>(
+FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<String>(
"localhost:9092", // broker list
"my-topic", // target topic
new SimpleStringSchema()); // serialization schema
@@ -466,29 +466,17 @@ FlinkKafkaProducer08<String> myProducer = new FlinkKafkaProducer08<String>(
myProducer.setLogFailuresOnly(false); // "false" by default
myProducer.setFlushOnCheckpoint(true); // "false" by default
-stream.addSink(myProducer);
-{% endhighlight %}
-</div>
-<div data-lang="java, Kafka 0.10+" markdown="1">
-{% highlight java %}
-DataStream<String> stream = ...;
-
-FlinkKafkaProducer010Configuration myProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps(
- stream, // input stream
- "my-topic", // target topic
- new SimpleStringSchema(), // serialization schema
- properties); // custom configuration for KafkaProducer (including broker list)
+// versions 0.10+ allow attaching the records' event timestamp when writing them to Kafka
+myProducer.setWriteTimestampToKafka(true);
-// the following is necessary for at-least-once delivery guarantee
-myProducerConfig.setLogFailuresOnly(false); // "false" by default
-myProducerConfig.setFlushOnCheckpoint(true); // "false" by default
+stream.addSink(myProducer);
{% endhighlight %}
</div>
-<div data-lang="scala, Kafka 0.8+" markdown="1">
+<div data-lang="scala" markdown="1">
{% highlight scala %}
val stream: DataStream[String] = ...
-val myProducer = new FlinkKafkaProducer08[String](
+val myProducer = new FlinkKafkaProducer011[String](
"localhost:9092", // broker list
"my-topic", // target topic
new SimpleStringSchema) // serialization schema
@@ -497,22 +485,10 @@ val myProducer = new FlinkKafkaProducer08[String](
myProducer.setLogFailuresOnly(false) // "false" by default
myProducer.setFlushOnCheckpoint(true) // "false" by default
-stream.addSink(myProducer)
-{% endhighlight %}
-</div>
-<div data-lang="scala, Kafka 0.10+" markdown="1">
-{% highlight scala %}
-val stream: DataStream[String] = ...
+// versions 0.10+ allow attaching the records' event timestamp when writing them to Kafka
+myProducer.setWriteTimestampToKafka(true)
-val myProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps(
- stream, // input stream
- "my-topic", // target topic
- new SimpleStringSchema, // serialization schema
- properties) // custom configuration for KafkaProducer (including broker list)
-
-// the following is necessary for at-least-once delivery guarantee
-myProducerConfig.setLogFailuresOnly(false) // "false" by default
-myProducerConfig.setFlushOnCheckpoint(true) // "false" by default
+stream.addSink(myProducer)
{% endhighlight %}
</div>
</div>