You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/11/19 14:46:17 UTC
[1/2] flink git commit: [FLINK-3043] [docs] Fix description of Kafka
Consumer and Producer.
Repository: flink
Updated Branches:
refs/heads/master 6b253d9f8 -> 864357bac
[FLINK-3043] [docs] Fix description of Kafka Consumer and Producer.
This also adds to the deprecated classes pointers forward to the designated classes.
This closes #1380
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/864357ba
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/864357ba
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/864357ba
Branch: refs/heads/master
Commit: 864357bacee3531d21a02c951c4b924fb0494eb6
Parents: 2061206
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Nov 18 20:30:05 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Nov 19 14:45:44 2015 +0100
----------------------------------------------------------------------
docs/apis/kafka.md | 63 ----------
docs/apis/streaming_guide.md | 121 ++++++-------------
.../connectors/kafka/FlinkKafkaProducer.java | 5 +-
.../connectors/kafka/api/KafkaSink.java | 2 +
.../api/persistent/PersistentKafkaSource.java | 5 +
5 files changed, 47 insertions(+), 149 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/864357ba/docs/apis/kafka.md
----------------------------------------------------------------------
diff --git a/docs/apis/kafka.md b/docs/apis/kafka.md
deleted file mode 100644
index 0c0790a..0000000
--- a/docs/apis/kafka.md
+++ /dev/null
@@ -1,63 +0,0 @@
----
-title: "Reading from Kafka"
-is_beta: true
----
-
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-<a href="#top"></a>
-
-Interact with [Apache Kafka](https://kafka.apache.org/) streams from Flink's APIs.
-
-* This will be replaced by the TOC
-{:toc}
-
-
-Kafka Connector
------------
-
-### Background
-
-Flink provides special Kafka Connectors for reading and writing data to Kafka topics.
-The Flink Kafka Consumer integrates with Flink's checkpointing mechanisms to provide different
-processing guarantees (most importantly exactly-once guarantees).
-
-For exactly-once processing Flink can not rely on the auto-commit capabilities of the Kafka consumers.
-The Kafka consumer might commit offsets to Kafka which have not been processed successfully.
-
-Flink provides different connector implementations for different use-cases and environments.
-
-
-
-
-### How to read data from Kafka
-
-#### Choose appropriate package and class
-
-Please pick a package (maven artifact id) and class name for your use-case and environment. For most users, the `flink-connector-kafka-083` package and the `FlinkKafkaConsumer082` class are appropriate.
-
-| Package | Supported Since | Class | Kafka Version | Allows exactly once processing | Notes |
-| ------------- |-------------| -----| ------ | ------ |
-| flink-connector-kafka | 0.9, 0.10 | `KafkaSource` | 0.8.1, 0.8.2 | **No**, does not participate in checkpointing at all. | Uses the old, high level KafkaConsumer API, autocommits to ZK by Kafka |
-| flink-connector-kafka | 0.9, 0.10 | `PersistentKafkaSource` | 0.8.1, 0.8.2 | **No**, does not guarantee exactly-once processing, element order or strict partition assignment | Uses the old, high level KafkaConsumer API, offsets are committed into ZK manually |
-| flink-connector-kafka-083 | 0.9.1 0.10 | `FlinkKafkaConsumer081` | 0.8.1 | **yes** | Uses the [SimpleConsumer](https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example) API of Kafka internally. Offsets are committed to ZK manually |
-| flink-connector-kafka-083 | 0.9.1 0.10 | `FlinkKafkaConsumer082` | 0.8.2 | **yes** | Uses the [SimpleConsumer](https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example) API of Kafka internally. Offsets are committed to ZK manually |
-
-
http://git-wip-us.apache.org/repos/asf/flink/blob/864357ba/docs/apis/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming_guide.md b/docs/apis/streaming_guide.md
index 1c97dd9..fb6d86a 100644
--- a/docs/apis/streaming_guide.md
+++ b/docs/apis/streaming_guide.md
@@ -3367,74 +3367,43 @@ with connectors.
This connector provides access to event streams served by [Apache Kafka](https://kafka.apache.org/).
-Flink provides special Kafka Connectors for reading and writing data to Kafka topics.
-The Flink Kafka Consumer integrates with Flink's checkpointing mechanisms to provide different
-processing guarantees (most importantly exactly-once guarantees).
-
-For exactly-once processing Flink can not rely on the auto-commit capabilities of the Kafka consumers.
-The Kafka consumer might commit offsets to Kafka which have not been processed successfully.
+Flink provides special Kafka Connectors for reading and writing data from/to Kafka topics.
+The Flink Kafka Consumer integrates with Flink's checkpointing mechanism to provide
+exactly-once processing semantics. To achieve that, Flink does not purely rely on Kafka's consumer group
+offset tracking, but tracks and checkpoints these offsets internally as well.
Please pick a package (maven artifact id) and class name for your use-case and environment.
-For most users, the `flink-connector-kafka-083` package and the `FlinkKafkaConsumer082` class are appropriate.
+For most users, the `FlinkKafkaConsumer082` (part of `flink-connector-kafka`) is appropriate.
<table class="table table-bordered">
<thead>
<tr>
- <th class="text-left">Package</th>
+ <th class="text-left">Maven Dependency</th>
<th class="text-left">Supported since</th>
<th class="text-left">Class name</th>
- <th class="text-left">Kafka version</th>
- <th class="text-left">Checkpointing behavior</th>
- <th class="text-left">Notes</th>
+ <th class="text-left">Kafka version</th>
+ <th class="text-left">Notes</th>
</tr>
</thead>
<tbody>
<tr>
<td>flink-connector-kafka</td>
- <td>0.9, 0.10</td>
- <td>KafkaSource</td>
- <td>0.8.1, 0.8.2</td>
- <td>Does not participate in checkpointing (no consistency guarantees)</td>
- <td>Uses the old, high level KafkaConsumer API, autocommits to ZK via Kafka</td>
- </tr>
- <tr>
- <td>flink-connector-kafka</td>
- <td>0.9, 0.10</td>
- <td>PersistentKafkaSource</td>
- <td>0.8.1, 0.8.2</td>
- <td>Does not guarantee exactly-once processing, element order, or strict partition assignment</td>
- <td>Uses the old, high level KafkaConsumer API, offsets are committed into ZK manually</td>
- </tr>
- <tr>
- <td>flink-connector-kafka-083</td>
<td>0.9.1, 0.10</td>
<td>FlinkKafkaConsumer081</td>
- <td>0.8.1</td>
- <td>Guarantees exactly-once processing</td>
- <td>Uses the <a href = "https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example">SimpleConsumer</a> API of Kafka internally. Offsets are committed to ZK manually</td>
+ <td>0.8.1</td>
+ <td>Uses the <a href="https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example">SimpleConsumer</a> API of Kafka internally. Offsets are committed to ZK by Flink.</td>
</tr>
<tr>
- <td>flink-connector-kafka-083</td>
+ <td>flink-connector-kafka</td>
<td>0.9.1, 0.10</td>
<td>FlinkKafkaConsumer082</td>
- <td>0.8.2</td>
- <td>Guarantee exactly-once processing</td>
- <td>Uses the <a href = "https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example">SimpleConsumer</a> API of Kafka internally. Offsets are committed to ZK manually</td>
- </tr>
+ <td>0.8.2</td>
+ <td>Uses the <a href="https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example">SimpleConsumer</a> API of Kafka internally. Offsets are committed to ZK by Flink.</td>
+ </tr>
</tbody>
</table>
-
-<!--
-| Package | Supported Since | Class | Kafka Version | Allows exactly once processing | Notes |
-| ------------- |-------------| -----| ------ | ------ |
-| flink-connector-kafka | 0.9, 0.10 | `KafkaSource` | 0.8.1, 0.8.2 | **No**, does not participate in checkpointing at all. | Uses the old, high level KafkaConsumer API, autocommits to ZK by Kafka |
-| flink-connector-kafka | 0.9, 0.10 | `PersistentKafkaSource` | 0.8.1, 0.8.2 | **No**, does not guarantee exactly-once processing, element order or strict partition assignment | Uses the old, high level KafkaConsumer API, offsets are committed into ZK manually |
-| flink-connector-kafka-083 | 0.9.1 0.10 | `FlinkKafkaConsumer081` | 0.8.1 | **yes** | Uses the [SimpleConsumer](https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example) API of Kafka internally. Offsets are committed to ZK manually |
-| flink-connector-kafka-083 | 0.9.1 0.10 | `FlinkKafkaConsumer082` | 0.8.2 | **yes** | Uses the [SimpleConsumer](https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example) API of Kafka internally. Offsets are committed to ZK manually |
--->
-
Then, import the connector in your maven project:
{% highlight xml %}
@@ -3448,15 +3417,14 @@ Then, import the connector in your maven project:
Note that the streaming connectors are currently not part of the binary distribution. See how to link with them for cluster execution [here](cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution).
#### Installing Apache Kafka
+
* Follow the instructions from [Kafka's quickstart](https://kafka.apache.org/documentation.html#quickstart) to download the code and launch a server (launching a Zookeeper and a Kafka server is required every time before starting the application).
* On 32 bit computers [this](http://stackoverflow.com/questions/22325364/unrecognized-vm-option-usecompressedoops-when-running-kafka-from-my-ubuntu-in) problem may occur.
* If the Kafka and Zookeeper servers are running on a remote machine, then the `advertised.host.name` setting in the `config/server.properties` file must be set to the machine's IP address.
#### Kafka Consumer
-The standard `FlinkKafkaConsumer082` is a Kafka consumer providing access to one topic.
-
-The following parameters have to be provided for the `FlinkKafkaConsumer082(...)` constructor:
+The standard `FlinkKafkaConsumer082` is a Kafka consumer providing access to one topic. It takes the following parameters to the constructor:
1. The topic name
2. A DeserializationSchema
@@ -3495,12 +3463,12 @@ stream = env
#### Kafka Consumers and Fault Tolerance
-As Kafka persists all the data, a fault tolerant Kafka consumer can be provided.
+With Flink's checkpointing enabled, the Flink Kafka Consumer will consume records from a topic and periodically checkpoint all
+its Kafka offsets, together with the state of other operations, in a consistent manner. In case of a job failure, Flink will restore
+the streaming program to the state of the latest checkpoint and re-consume the records from Kafka, starting from the offsets that where
+stored in the checkpoint.
-The FlinkKafkaConsumer082 can read a topic, and if the job fails for some reason, the source will
-continue on reading from where it left off after a restart.
-For example if there are 3 partitions in the topic with offsets 31, 122, 110 read at the time of job
-failure, then at the time of restart it will continue on reading from those offsets, no matter whether these partitions have new messages.
+The interval of drawing checkpoints therefore defines how much the program may have to go back at most, in case of a failure.
To use fault tolerant Kafka Consumers, checkpointing of the topology needs to be enabled at the execution environment:
@@ -3508,7 +3476,13 @@ To use fault tolerant Kafka Consumers, checkpointing of the topology needs to be
<div data-lang="java" markdown="1">
{% highlight java %}
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-env.enableCheckpointing(5000);
+env.enableCheckpointing(5000); // checkpoint every 5000 msecs
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.enableCheckpointing(5000) // checkpoint every 5000 msecs
{% endhighlight %}
</div>
</div>
@@ -3518,52 +3492,29 @@ So if the topology fails due to loss of a TaskManager, there must still be enoug
Flink on YARN supports automatic restart of lost YARN containers.
-#### Kafka Sink
-
-A class providing an interface for sending data to Kafka.
+#### Kafka Producer
-The following arguments have to be provided for the `KafkaSink(…)` constructor in order:
-
-1. Broker address (in hostname:port format, can be a comma separated list)
-2. The topic name
-3. Serialization schema
+The `FlinkKafkaProducer` writes data to a Kafka topic. The producer can specify a custom partitioner that assigns
+recors to partitions.
Example:
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
-stream.addSink(new KafkaSink<String>("localhost:9092", "test", new SimpleStringSchema()));
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-stream.addSink(new KafkaSink[String]("localhost:9092", "test", new SimpleStringSchema))
-{% endhighlight %}
-</div>
-</div>
-
-The user can also define custom Kafka producer configuration for the KafkaSink with the constructor:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-public KafkaSink(String zookeeperAddress, String topicId, Properties producerConfig,
- SerializationSchema<IN, byte[]> serializationSchema)
+stream.addSink(new FlinkKafkaProducer<String>("localhost:9092", "my-topic", new SimpleStringSchema()));
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
-public KafkaSink(String zookeeperAddress, String topicId, Properties producerConfig,
- SerializationSchema serializationSchema)
+stream.addSink(new FlinkKafkaProducer[String]("localhost:9092", "my-topic", new SimpleStringSchema()))
{% endhighlight %}
</div>
</div>
-If this constructor is used, the user needs to make sure to set the broker(s) with the "metadata.broker.list" property.
-Also the serializer configuration should be left default, and the serialization should be set via SerializationSchema.
-
-The Apache Kafka official documentation can be found [here](https://kafka.apache.org/documentation.html).
+You can also define a custom Kafka producer configuration for the KafkaSink with the constructor. Please refer to
+the [Apache Kafka documentation](https://kafka.apache.org/documentation.html) for details on how to configure
+Kafka Producers.
[Back to top](#top)
http://git-wip-us.apache.org/repos/asf/flink/blob/864357ba/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
index 715f5ee..5e08464 100644
--- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
+++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
@@ -278,9 +278,12 @@ public class FlinkKafkaProducer<IN> extends RichSinkFunction<IN> {
public static Properties getPropertiesFromBrokerList(String brokerList) {
String[] elements = brokerList.split(",");
- for(String broker: elements) {
+
+ // validate the broker addresses
+ for (String broker: elements) {
NetUtils.getCorrectHostnamePort(broker);
}
+
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
return props;
http://git-wip-us.apache.org/repos/asf/flink/blob/864357ba/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
index f856926..e832f20 100644
--- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
+++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
@@ -25,6 +25,8 @@ import org.apache.flink.streaming.util.serialization.SerializationSchema;
*
* The KafkaSink has been relocated to org.apache.flink.streaming.connectors.kafka.KafkaSink.
* This class will be removed in future releases of Flink.
+ *
+ * @deprecated Please use the {@link org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer} instead.
*/
@Deprecated
public class KafkaSink<IN> extends FlinkKafkaProducer<IN> {
http://git-wip-us.apache.org/repos/asf/flink/blob/864357ba/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
index 869c44f..2efeb20 100644
--- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
+++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
@@ -30,6 +30,11 @@ import org.apache.flink.streaming.util.serialization.DeserializationSchema;
* Please use FlinkKafkaConsumer081 and FlinkKafkaConsumer082.
*
* @param <T> The type of elements produced by this consumer.
+ *
+ * @deprecated Due to Kafka protocol and architecture (offset handling) changes, please use the
+ * Kafka version specific consumers, like
+ * {@link org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer081},
+ * {@link org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082}, etc.
*/
@Deprecated
public class PersistentKafkaSource<T> extends FlinkKafkaConsumer<T> {
[2/2] flink git commit: [FLINK-3005] [core] Bump commons-collections
version to fix object deserialization remote command execution vulnerability
Posted by se...@apache.org.
[FLINK-3005] [core] Bump commons-collections version to fix object deserialization remote command execution vulnerability
This closes #1381
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/20612063
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/20612063
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/20612063
Branch: refs/heads/master
Commit: 206120631d97898c9396d74b2450eb36af17e06a
Parents: 6b253d9
Author: tedyu <yu...@gmail.com>
Authored: Wed Nov 18 13:56:31 2015 -0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Nov 19 14:45:44 2015 +0100
----------------------------------------------------------------------
pom.xml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/20612063/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index af4ff8d..95c7b6e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -210,7 +210,7 @@ under the License.
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
- <version>3.2.1</version>
+ <version>3.2.2</version>
</dependency>
<dependency>