You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2015/09/02 16:07:41 UTC

flink git commit: [hotfix][docs] Update Kafka section in streaming guide to match the renamed class names

Repository: flink
Updated Branches:
  refs/heads/master 5d98e77cd -> d09f0027b


[hotfix][docs] Update Kafka section in streaming guide to match the renamed class names


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d09f0027
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d09f0027
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d09f0027

Branch: refs/heads/master
Commit: d09f0027bddefa7d3f3f73dbf6f26926338cf712
Parents: 5d98e77
Author: Robert Metzger <rm...@apache.org>
Authored: Wed Sep 2 16:05:25 2015 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Wed Sep 2 16:06:27 2015 +0200

----------------------------------------------------------------------
 docs/apis/streaming_guide.md | 58 +++++++++++++++++----------------------
 1 file changed, 25 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d09f0027/docs/apis/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming_guide.md b/docs/apis/streaming_guide.md
index be7e761..081c602 100644
--- a/docs/apis/streaming_guide.md
+++ b/docs/apis/streaming_guide.md
@@ -1299,7 +1299,7 @@ The [docs on streaming fault tolerance](../internals/stream_checkpointing.html)
 Stateful computation
 ------------
 
-Flink supports the checkpointing and persistence of user defined operator states, so in case of a failure this state can be restored to the latest checkpoint and the processing will continue from there. This gives exactly once processing semantics with respect to the operator states when the sources follow this stateful pattern as well. In practice this usually means that sources keep track of their current offset as their OperatorState. The `PersistentKafkaSource` provides this stateful functionality for reading streams from Kafka.
+Flink supports the checkpointing and persistence of user defined operator states, so in case of a failure this state can be restored to the latest checkpoint and the processing will continue from there. This gives exactly once processing semantics with respect to the operator states when the sources follow this stateful pattern as well. In practice this usually means that sources keep track of their current offset as their OperatorState. The `FlinkKafkaConsumer` provides this stateful functionality for reading streams from Kafka.
 
 ### OperatorState
 
@@ -1559,42 +1559,54 @@ Note that the streaming connectors are currently not part of the binary distribu
 * If the Kafka and Zookeeper servers are running on a remote machine, then the `advertised.host.name` setting in the `config/server.properties` file the  must be set to the machine's IP address.
 
 #### Kafka Source
-The standard `KafkaSource` is a Kafka consumer providing access to one topic.
+The standard `FlinkKafkaConsumer082` is a Kafka consumer providing access to one topic.
 
-The following parameters have to be provided for the `KafkaSource(...)` constructor:
+The following parameters have to be provided for the `FlinkKafkaConsumer082(...)` constructor:
 
-1. Zookeeper hostname
-2. The topic name
-3. Deserialization schema
+1. The topic name
+2. A DeserializationSchema
+3. Properties for the Kafka consumer.
+  The following properties are required:
+  - "bootstrap.servers" (comma separated list of Kafka brokers)
+  - "zookeeper.connect" (comma separated list of Zookeeper servers)
+  - "group.id" the id of the consumer group
 
 Example:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
+Properties properties = new Properties();
+properties.setProperty("bootstrap.servers", "localhost:9092");
+properties.setProperty("zookeeper.connect", "localhost:2181");
+properties.setProperty("group.id", "test");
 DataStream<String> stream = env
-	.addSource(new KafkaSource<String>("localhost:2181", "test", new SimpleStringSchema()))
+	.addSource(new FlinkKafkaConsumer082<>("topic", new SimpleStringSchema(), properties))
 	.print();
 {% endhighlight %}
 </div>
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
+val properties = new Properties();
+properties.setProperty("bootstrap.servers", "localhost:9092");
+properties.setProperty("zookeeper.connect", "localhost:2181");
+properties.setProperty("group.id", "test");
 stream = env
-    .addSource(new KafkaSource[String]("localhost:2181", "test", new SimpleStringSchema)
+    .addSource(new KafkaSource[String]("topic", new SimpleStringSchema(), properties))
     .print
 {% endhighlight %}
 </div>
 </div>
 
-#### Persistent Kafka Source
-As Kafka persists all the data, a fault tolerant Kafka source can be provided.
+#### Kafka Consumers and Fault Tolerance
+As Kafka persists all the data, a fault tolerant Kafka consumer can be provided.
 
-The PersistentKafkaSource can read a topic, and if the job fails for some reason, the source will
+The FlinkKafkaConsumer082 can read a topic, and if the job fails for some reason, the source will
 continue on reading from where it left off after a restart.
 For example if there are 3 partitions in the topic with offsets 31, 122, 110 read at the time of job
 failure, then at the time of restart it will continue on reading from those offsets, no matter whether these partitions have new messages.
 
-To use fault tolerant Kafka Sources, monitoring of the topology needs to be enabled at the execution environment:
+To use fault tolerant Kafka Consumers, checkpointing of the topology needs to be enabled at the execution environment:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -1609,31 +1621,11 @@ Also note that Flink can only restart the topology if enough processing slots ar
 So if the topology fails due to loss of a TaskManager, there must still be enough slots available afterwards.
 Flink on YARN supports automatic restart of lost YARN containers.
 
-The following arguments have to be provided for the `PersistentKafkaSource(...)` constructor:
-
-1. Zookeeper hostname
-2. The topic name
-3. Deserialization schema
-
-Example:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-stream.addSource(new PersistentKafkaSource<String>("localhost:2181", "test", new SimpleStringSchema()));
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-stream.addSource(new PersistentKafkaSource[String]("localhost:2181", "test", new SimpleStringSchema))
-{% endhighlight %}
-</div>
-</div>
 
 #### Kafka Sink
 A class providing an interface for sending data to Kafka. 
 
-The followings have to be provided for the `KafkaSink(…)` constructor in order:
+The following arguments have to be provided for the `KafkaSink(…)` constructor in order:
 
 1. Broker address (in hostname:port format, can be a comma separated list)
 2. The topic name