You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/11/19 14:46:17 UTC

[1/2] flink git commit: [FLINK-3043] [docs] Fix description of Kafka Consumer and Producer.

Repository: flink
Updated Branches:
  refs/heads/master 6b253d9f8 -> 864357bac


[FLINK-3043] [docs] Fix description of Kafka Consumer and Producer.

This also adds to the deprecated classes pointers forward to the designated classes.

This closes #1380


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/864357ba
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/864357ba
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/864357ba

Branch: refs/heads/master
Commit: 864357bacee3531d21a02c951c4b924fb0494eb6
Parents: 2061206
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Nov 18 20:30:05 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Nov 19 14:45:44 2015 +0100

----------------------------------------------------------------------
 docs/apis/kafka.md                              |  63 ----------
 docs/apis/streaming_guide.md                    | 121 ++++++-------------
 .../connectors/kafka/FlinkKafkaProducer.java    |   5 +-
 .../connectors/kafka/api/KafkaSink.java         |   2 +
 .../api/persistent/PersistentKafkaSource.java   |   5 +
 5 files changed, 47 insertions(+), 149 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/864357ba/docs/apis/kafka.md
----------------------------------------------------------------------
diff --git a/docs/apis/kafka.md b/docs/apis/kafka.md
deleted file mode 100644
index 0c0790a..0000000
--- a/docs/apis/kafka.md
+++ /dev/null
@@ -1,63 +0,0 @@
----
-title: "Reading from Kafka"
-is_beta: true
----
-
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-<a href="#top"></a>
-
-Interact with [Apache Kafka](https://kafka.apache.org/) streams from Flink's APIs.
-
-* This will be replaced by the TOC
-{:toc}
-
-
-Kafka Connector
------------
-
-### Background
-
-Flink provides special Kafka Connectors for reading and writing data to Kafka topics.
-The Flink Kafka Consumer integrates with Flink's checkpointing mechanisms to provide different 
-processing guarantees (most importantly exactly-once guarantees).
-
-For exactly-once processing Flink can not rely on the auto-commit capabilities of the Kafka consumers.
-The Kafka consumer might commit offsets to Kafka which have not been processed successfully.
-
-Flink provides different connector implementations for different use-cases and environments.
-
-
-
-
-### How to read data from Kafka
-
-#### Choose appropriate package and class
-
-Please pick a package (maven artifact id) and class name for your use-case and environment. For most users, the `flink-connector-kafka-083` package and the `FlinkKafkaConsumer082` class are appropriate.
-
-| Package                     | Supported Since | Class | Kafka Version | Allows exactly once processing | Notes |
-| -------------               |-------------| -----| ------ | ------ |
-| flink-connector-kafka       | 0.9, 0.10 | `KafkaSource` | 0.8.1, 0.8.2 | **No**, does not participate in checkpointing at all. | Uses the old, high level KafkaConsumer API, autocommits to ZK by Kafka |
-| flink-connector-kafka       | 0.9, 0.10 | `PersistentKafkaSource` | 0.8.1, 0.8.2 | **No**, does not guarantee exactly-once processing, element order or strict partition assignment | Uses the old, high level KafkaConsumer API, offsets are committed into ZK manually |
-| flink-connector-kafka-083   | 0.9.1 0.10 | `FlinkKafkaConsumer081` | 0.8.1  | **yes** | Uses the [SimpleConsumer](https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example) API of Kafka internally. Offsets are committed to ZK manually |
-| flink-connector-kafka-083   | 0.9.1 0.10 | `FlinkKafkaConsumer082` | 0.8.2  | **yes** | Uses the [SimpleConsumer](https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example) API of Kafka internally. Offsets are committed to ZK manually |
-
-

http://git-wip-us.apache.org/repos/asf/flink/blob/864357ba/docs/apis/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming_guide.md b/docs/apis/streaming_guide.md
index 1c97dd9..fb6d86a 100644
--- a/docs/apis/streaming_guide.md
+++ b/docs/apis/streaming_guide.md
@@ -3367,74 +3367,43 @@ with connectors.
 
 This connector provides access to event streams served by [Apache Kafka](https://kafka.apache.org/).
 
-Flink provides special Kafka Connectors for reading and writing data to Kafka topics.
-The Flink Kafka Consumer integrates with Flink's checkpointing mechanisms to provide different
-processing guarantees (most importantly exactly-once guarantees).
-
-For exactly-once processing Flink can not rely on the auto-commit capabilities of the Kafka consumers.
-The Kafka consumer might commit offsets to Kafka which have not been processed successfully.
+Flink provides special Kafka Connectors for reading and writing data from/to Kafka topics.
+The Flink Kafka Consumer integrates with Flink's checkpointing mechanism to provide
+exactly-once processing semantics. To achieve that, Flink does not purely rely on Kafka's consumer group
+offset tracking, but tracks and checkpoints these offsets internally as well.
 
 Please pick a package (maven artifact id) and class name for your use-case and environment.
-For most users, the `flink-connector-kafka-083` package and the `FlinkKafkaConsumer082` class are appropriate.
+For most users, the `FlinkKafkaConsumer082` (part of `flink-connector-kafka`) is appropriate.
 
 
 <table class="table table-bordered">
   <thead>
     <tr>
-      <th class="text-left">Package</th>
+      <th class="text-left">Maven Dependency</th>
       <th class="text-left">Supported since</th>
       <th class="text-left">Class name</th>
-      <th class="text-left">Kafka version</th>      
-      <th class="text-left">Checkpointing behavior</th>
-      <th class="text-left">Notes</th>            
+      <th class="text-left">Kafka version</th>
+      <th class="text-left">Notes</th>
     </tr>
   </thead>
   <tbody>
     <tr>
         <td>flink-connector-kafka</td>
-        <td>0.9, 0.10</td>
-        <td>KafkaSource</td>
-	<td>0.8.1, 0.8.2</td>	
-	<td>Does not participate in checkpointing (no consistency guarantees)</td>
-	<td>Uses the old, high level KafkaConsumer API, autocommits to ZK via Kafka</td>	
-    </tr>
-    <tr>
-        <td>flink-connector-kafka</td>
-        <td>0.9, 0.10</td>
-        <td>PersistentKafkaSource</td>
-	<td>0.8.1, 0.8.2</td>	
-	<td>Does not guarantee exactly-once processing, element order, or strict partition assignment</td>
-	<td>Uses the old, high level KafkaConsumer API, offsets are committed into ZK manually</td>	
-    </tr>
-    <tr>
-        <td>flink-connector-kafka-083</td>
         <td>0.9.1, 0.10</td>
         <td>FlinkKafkaConsumer081</td>
-	<td>0.8.1</td>	
-	<td>Guarantees exactly-once processing</td>
-	<td>Uses the <a href = "https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example">SimpleConsumer</a> API of Kafka internally. Offsets are committed to ZK manually</td>	
+        <td>0.8.1</td>	
+        <td>Uses the <a href="https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example">SimpleConsumer</a> API of Kafka internally. Offsets are committed to ZK by Flink.</td>	
     </tr>
     <tr>
-        <td>flink-connector-kafka-083</td>
+        <td>flink-connector-kafka</td>
         <td>0.9.1, 0.10</td>
         <td>FlinkKafkaConsumer082</td>
-	<td>0.8.2</td>	
-	<td>Guarantee exactly-once processing</td>
-	<td>Uses the <a href = "https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example">SimpleConsumer</a> API of Kafka internally. Offsets are committed to ZK manually</td>	
-    </tr>    
+        <td>0.8.2</td>	
+        <td>Uses the <a href="https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example">SimpleConsumer</a> API of Kafka internally. Offsets are committed to ZK by Flink.</td>	
+    </tr>
   </tbody>
 </table>
 
-
-<!--
-| Package                     | Supported Since | Class | Kafka Version | Allows exactly once processing | Notes |
-| -------------               |-------------| -----| ------ | ------ |
-| flink-connector-kafka       | 0.9, 0.10 | `KafkaSource` | 0.8.1, 0.8.2 | **No**, does not participate in checkpointing at all. | Uses the old, high level KafkaConsumer API, autocommits to ZK by Kafka |
-| flink-connector-kafka       | 0.9, 0.10 | `PersistentKafkaSource` | 0.8.1, 0.8.2 | **No**, does not guarantee exactly-once processing, element order or strict partition assignment | Uses the old, high level KafkaConsumer API, offsets are committed into ZK manually |
-| flink-connector-kafka-083   | 0.9.1 0.10 | `FlinkKafkaConsumer081` | 0.8.1  | **yes** | Uses the [SimpleConsumer](https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example) API of Kafka internally. Offsets are committed to ZK manually |
-| flink-connector-kafka-083   | 0.9.1 0.10 | `FlinkKafkaConsumer082` | 0.8.2  | **yes** | Uses the [SimpleConsumer](https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example) API of Kafka internally. Offsets are committed to ZK manually |
--->
-
 Then, import the connector in your maven project:
 
 {% highlight xml %}
@@ -3448,15 +3417,14 @@ Then, import the connector in your maven project:
 Note that the streaming connectors are currently not part of the binary distribution. See how to link with them for cluster execution [here](cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution).
 
 #### Installing Apache Kafka
+
 * Follow the instructions from [Kafka's quickstart](https://kafka.apache.org/documentation.html#quickstart) to download the code and launch a server (launching a Zookeeper and a Kafka server is required every time before starting the application).
 * On 32 bit computers [this](http://stackoverflow.com/questions/22325364/unrecognized-vm-option-usecompressedoops-when-running-kafka-from-my-ubuntu-in) problem may occur.
 * If the Kafka and Zookeeper servers are running on a remote machine, then the `advertised.host.name` setting in the `config/server.properties` file must be set to the machine's IP address.
 
 #### Kafka Consumer
 
-The standard `FlinkKafkaConsumer082` is a Kafka consumer providing access to one topic.
-
-The following parameters have to be provided for the `FlinkKafkaConsumer082(...)` constructor:
+The standard `FlinkKafkaConsumer082` is a Kafka consumer providing access to one topic. It takes the following parameters to the constructor:
 
 1. The topic name
 2. A DeserializationSchema
@@ -3495,12 +3463,12 @@ stream = env
 
 #### Kafka Consumers and Fault Tolerance
 
-As Kafka persists all the data, a fault tolerant Kafka consumer can be provided.
+With Flink's checkpointing enabled, the Flink Kafka Consumer will consume records from a topic and periodically checkpoint all
+its Kafka offsets, together with the state of other operations, in a consistent manner. In case of a job failure, Flink will restore
+the streaming program to the state of the latest checkpoint and re-consume the records from Kafka, starting from the offsets that where
+stored in the checkpoint.
 
-The FlinkKafkaConsumer082 can read a topic, and if the job fails for some reason, the source will
-continue on reading from where it left off after a restart.
-For example if there are 3 partitions in the topic with offsets 31, 122, 110 read at the time of job
-failure, then at the time of restart it will continue on reading from those offsets, no matter whether these partitions have new messages.
+The interval of drawing checkpoints therefore defines how much the program may have to go back at most, in case of a failure.
 
 To use fault tolerant Kafka Consumers, checkpointing of the topology needs to be enabled at the execution environment:
 
@@ -3508,7 +3476,13 @@ To use fault tolerant Kafka Consumers, checkpointing of the topology needs to be
 <div data-lang="java" markdown="1">
 {% highlight java %}
 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-env.enableCheckpointing(5000);
+env.enableCheckpointing(5000); // checkpoint every 5000 msecs
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.enableCheckpointing(5000) // checkpoint every 5000 msecs
 {% endhighlight %}
 </div>
 </div>
@@ -3518,52 +3492,29 @@ So if the topology fails due to loss of a TaskManager, there must still be enoug
 Flink on YARN supports automatic restart of lost YARN containers.
 
 
-#### Kafka Sink
-
-A class providing an interface for sending data to Kafka.
+#### Kafka Producer
 
-The following arguments have to be provided for the `KafkaSink(…)` constructor in order:
-
-1. Broker address (in hostname:port format, can be a comma separated list)
-2. The topic name
-3. Serialization schema
+The `FlinkKafkaProducer` writes data to a Kafka topic. The producer can specify a custom partitioner that assigns
+recors to partitions.
 
 Example:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-stream.addSink(new KafkaSink<String>("localhost:9092", "test", new SimpleStringSchema()));
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-stream.addSink(new KafkaSink[String]("localhost:9092", "test", new SimpleStringSchema))
-{% endhighlight %}
-</div>
-</div>
-
-The user can also define custom Kafka producer configuration for the KafkaSink with the constructor:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-public KafkaSink(String zookeeperAddress, String topicId, Properties producerConfig,
-      SerializationSchema<IN, byte[]> serializationSchema)
+stream.addSink(new FlinkKafkaProducer<String>("localhost:9092", "my-topic", new SimpleStringSchema()));
 {% endhighlight %}
 </div>
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-public KafkaSink(String zookeeperAddress, String topicId, Properties producerConfig,
-      SerializationSchema serializationSchema)
+stream.addSink(new FlinkKafkaProducer[String]("localhost:9092", "my-topic", new SimpleStringSchema()))
 {% endhighlight %}
 </div>
 </div>
 
-If this constructor is used, the user needs to make sure to set the broker(s) with the "metadata.broker.list" property.
-Also the serializer configuration should be left default, and the serialization should be set via SerializationSchema.
-
-The Apache Kafka official documentation can be found [here](https://kafka.apache.org/documentation.html).
+You can also define a custom Kafka producer configuration for the KafkaSink with the constructor. Please refer to
+the [Apache Kafka documentation](https://kafka.apache.org/documentation.html) for details on how to configure
+Kafka Producers.
 
 [Back to top](#top)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/864357ba/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
index 715f5ee..5e08464 100644
--- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
+++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
@@ -278,9 +278,12 @@ public class FlinkKafkaProducer<IN> extends RichSinkFunction<IN>  {
 	
 	public static Properties getPropertiesFromBrokerList(String brokerList) {
 		String[] elements = brokerList.split(",");
-		for(String broker: elements) {
+		
+		// validate the broker addresses
+		for (String broker: elements) {
 			NetUtils.getCorrectHostnamePort(broker);
 		}
+		
 		Properties props = new Properties();
 		props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
 		return props;

http://git-wip-us.apache.org/repos/asf/flink/blob/864357ba/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
index f856926..e832f20 100644
--- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
+++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
@@ -25,6 +25,8 @@ import org.apache.flink.streaming.util.serialization.SerializationSchema;
  *
  * The KafkaSink has been relocated to org.apache.flink.streaming.connectors.kafka.KafkaSink.
  * This class will be removed in future releases of Flink.
+ * 
+ * @deprecated Please use the {@link org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer} instead.
  */
 @Deprecated
 public class KafkaSink<IN> extends FlinkKafkaProducer<IN> {

http://git-wip-us.apache.org/repos/asf/flink/blob/864357ba/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
index 869c44f..2efeb20 100644
--- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
+++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
@@ -30,6 +30,11 @@ import org.apache.flink.streaming.util.serialization.DeserializationSchema;
  * Please use FlinkKafkaConsumer081 and FlinkKafkaConsumer082.
  *
  * @param <T> The type of elements produced by this consumer.
+ * 
+ * @deprecated Due to Kafka protocol and architecture (offset handling) changes, please use the
+ *             Kafka version specific consumers, like
+ *             {@link org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer081}, 
+ *             {@link org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082}, etc.
  */
 @Deprecated
 public class PersistentKafkaSource<T> extends FlinkKafkaConsumer<T> {


[2/2] flink git commit: [FLINK-3005] [core] Bump commons-collections version to fix object deserialization remote command execution vulnerability

Posted by se...@apache.org.
[FLINK-3005] [core] Bump commons-collections version to fix object deserialization remote command execution vulnerability

This closes #1381


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/20612063
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/20612063
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/20612063

Branch: refs/heads/master
Commit: 206120631d97898c9396d74b2450eb36af17e06a
Parents: 6b253d9
Author: tedyu <yu...@gmail.com>
Authored: Wed Nov 18 13:56:31 2015 -0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Nov 19 14:45:44 2015 +0100

----------------------------------------------------------------------
 pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/20612063/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index af4ff8d..95c7b6e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -210,7 +210,7 @@ under the License.
 			<dependency>
 				<groupId>commons-collections</groupId>
 				<artifactId>commons-collections</artifactId>
-				<version>3.2.1</version>
+				<version>3.2.2</version>
 			</dependency>
 
 			<dependency>