You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2018/01/06 05:51:56 UTC

[03/11] flink git commit: [FLINK-8260] [kafka] Fix usage of deprecated instantiation methods in FlinkKafkaProducer docs

[FLINK-8260] [kafka] Fix usage of deprecated instantiation methods in FlinkKafkaProducer docs


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b49ead38
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b49ead38
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b49ead38

Branch: refs/heads/master
Commit: b49ead387a30ff0d4f204e82fd8430012380eb77
Parents: 7b5fdbd
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Mon Dec 18 15:21:31 2017 -0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Jan 5 20:14:20 2018 -0800

----------------------------------------------------------------------
 docs/dev/connectors/kafka.md | 46 ++++++++++-----------------------------
 1 file changed, 11 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b49ead38/docs/dev/connectors/kafka.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md
index daf1903..c6461f9 100644
--- a/docs/dev/connectors/kafka.md
+++ b/docs/dev/connectors/kafka.md
@@ -447,17 +447,17 @@ if a new watermark should be emitted and with which timestamp.
 
 ## Kafka Producer
 
-Flink’s Kafka Producer is called `FlinkKafkaProducer08` (or `09` for Kafka 0.9.0.x versions, etc.).
+Flink’s Kafka Producer is called `FlinkKafkaProducer011` (or `010` for Kafka 0.10.0.x versions, etc.).
 It allows writing a stream of records to one or more Kafka topics.
 
 Example:
 
 <div class="codetabs" markdown="1">
-<div data-lang="java, Kafka 0.8+" markdown="1">
+<div data-lang="java" markdown="1">
 {% highlight java %}
 DataStream<String> stream = ...;
 
-FlinkKafkaProducer08<String> myProducer = new FlinkKafkaProducer08<String>(
+FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<String>(
         "localhost:9092",            // broker list
         "my-topic",                  // target topic
         new SimpleStringSchema());   // serialization schema
@@ -466,29 +466,17 @@ FlinkKafkaProducer08<String> myProducer = new FlinkKafkaProducer08<String>(
 myProducer.setLogFailuresOnly(false);   // "false" by default
 myProducer.setFlushOnCheckpoint(true);  // "false" by default
 
-stream.addSink(myProducer);
-{% endhighlight %}
-</div>
-<div data-lang="java, Kafka 0.10+" markdown="1">
-{% highlight java %}
-DataStream<String> stream = ...;
-
-FlinkKafkaProducer010Configuration myProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps(
-        stream,                     // input stream
-        "my-topic",                 // target topic
-        new SimpleStringSchema(),   // serialization schema
-        properties);                // custom configuration for KafkaProducer (including broker list)
+// versions 0.10+ allow attaching the records' event timestamp when writing them to Kafka
+myProducer.setWriteTimestampToKafka(true);
 
-// the following is necessary for at-least-once delivery guarantee
-myProducerConfig.setLogFailuresOnly(false);   // "false" by default
-myProducerConfig.setFlushOnCheckpoint(true);  // "false" by default
+stream.addSink(myProducer);
 {% endhighlight %}
 </div>
-<div data-lang="scala, Kafka 0.8+" markdown="1">
+<div data-lang="scala" markdown="1">
 {% highlight scala %}
 val stream: DataStream[String] = ...
 
-val myProducer = new FlinkKafkaProducer08[String](
+val myProducer = new FlinkKafkaProducer011[String](
         "localhost:9092",         // broker list
         "my-topic",               // target topic
         new SimpleStringSchema)   // serialization schema
@@ -497,22 +485,10 @@ val myProducer = new FlinkKafkaProducer08[String](
 myProducer.setLogFailuresOnly(false)   // "false" by default
 myProducer.setFlushOnCheckpoint(true)  // "false" by default
 
-stream.addSink(myProducer)
-{% endhighlight %}
-</div>
-<div data-lang="scala, Kafka 0.10+" markdown="1">
-{% highlight scala %}
-val stream: DataStream[String] = ...
+// versions 0.10+ allow attaching the records' event timestamp when writing them to Kafka
+myProducer.setWriteTimestampToKafka(true)
 
-val myProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps(
-        stream,                   // input stream
-        "my-topic",               // target topic
-        new SimpleStringSchema,   // serialization schema
-        properties)               // custom configuration for KafkaProducer (including broker list)
-
-// the following is necessary for at-least-once delivery guarantee
-myProducerConfig.setLogFailuresOnly(false)   // "false" by default
-myProducerConfig.setFlushOnCheckpoint(true)  // "false" by default
+stream.addSink(myProducer)
 {% endhighlight %}
 </div>
 </div>