You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2016/10/11 08:05:12 UTC
[2/3] flink git commit: [FLINK-4035] Refactor the Kafka 0.10
connector to be based upon the 0.9 connector
[FLINK-4035] Refactor the Kafka 0.10 connector to be based upon the 0.9 connector
Add a test case for Kafka's new timestamp functionality and update the documentation.
This closes #2369
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6731ec1e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6731ec1e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6731ec1e
Branch: refs/heads/master
Commit: 6731ec1e48d0a0092dd2330adda73bcf37fda8d7
Parents: 63859c6
Author: Robert Metzger <rm...@apache.org>
Authored: Tue Aug 9 16:38:21 2016 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Tue Oct 11 10:04:25 2016 +0200
----------------------------------------------------------------------
docs/dev/connectors/kafka.md | 67 +++-
docs/page/js/flink.js | 3 +-
.../flink-connector-kafka-0.10/pom.xml | 50 +--
.../connectors/kafka/FlinkKafkaConsumer010.java | 121 +------
.../connectors/kafka/FlinkKafkaProducer010.java | 315 +++++++++++++++++--
.../kafka/Kafka010JsonTableSource.java | 2 +-
.../connectors/kafka/Kafka010TableSource.java | 2 +-
.../kafka/internal/Kafka010Fetcher.java | 268 ++--------------
.../connectors/kafka/Kafka010ITCase.java | 266 +++++++++++-----
.../connectors/kafka/KafkaProducerTest.java | 119 -------
.../kafka/KafkaTestEnvironmentImpl.java | 80 ++++-
.../kafka/internals/SimpleConsumerThread.java | 2 +-
.../kafka/KafkaTestEnvironmentImpl.java | 7 +-
.../connectors/kafka/FlinkKafkaConsumer09.java | 4 +-
.../connectors/kafka/FlinkKafkaProducer09.java | 2 +-
.../kafka/internal/Kafka09Fetcher.java | 22 +-
.../kafka/KafkaTestEnvironmentImpl.java | 6 +-
.../kafka/FlinkKafkaConsumerBase.java | 4 +
.../kafka/FlinkKafkaProducerBase.java | 4 +-
.../kafka/internals/AbstractFetcher.java | 43 +--
...picPartitionStateWithPeriodicWatermarks.java | 4 +-
...cPartitionStateWithPunctuatedWatermarks.java | 4 +-
.../connectors/kafka/KafkaConsumerTestBase.java | 201 ++++++------
.../connectors/kafka/KafkaProducerTestBase.java | 5 +-
.../kafka/KafkaShortRetentionTestBase.java | 4 +-
.../connectors/kafka/KafkaTestEnvironment.java | 7 +-
.../AbstractFetcherTimestampsTest.java | 68 ++--
.../kafka/testutils/DataGenerators.java | 87 ++---
.../testutils/JobManagerCommunicationUtils.java | 21 +-
29 files changed, 936 insertions(+), 852 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/docs/dev/connectors/kafka.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md
index d2221fa..9a360d4 100644
--- a/docs/dev/connectors/kafka.md
+++ b/docs/dev/connectors/kafka.md
@@ -46,14 +46,6 @@ For most users, the `FlinkKafkaConsumer08` (part of `flink-connector-kafka`) is
</thead>
<tbody>
<tr>
- <td>flink-connector-kafka</td>
- <td>0.9.1, 0.10</td>
- <td>FlinkKafkaConsumer082<br>
- FlinkKafkaProducer</td>
- <td>0.8.x</td>
- <td>Uses the <a href="https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example">SimpleConsumer</a> API of Kafka internally. Offsets are committed to ZK by Flink.</td>
- </tr>
- <tr>
<td>flink-connector-kafka-0.8{{ site.scala_version_suffix }}</td>
<td>1.0.0</td>
<td>FlinkKafkaConsumer08<br>
@@ -61,7 +53,7 @@ For most users, the `FlinkKafkaConsumer08` (part of `flink-connector-kafka`) is
<td>0.8.x</td>
<td>Uses the <a href="https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example">SimpleConsumer</a> API of Kafka internally. Offsets are committed to ZK by Flink.</td>
</tr>
- <tr>
+ <tr>
<td>flink-connector-kafka-0.9{{ site.scala_version_suffix }}</td>
<td>1.0.0</td>
<td>FlinkKafkaConsumer09<br>
@@ -69,6 +61,14 @@ For most users, the `FlinkKafkaConsumer08` (part of `flink-connector-kafka`) is
<td>0.9.x</td>
<td>Uses the new <a href="http://kafka.apache.org/documentation.html#newconsumerapi">Consumer API</a> Kafka.</td>
</tr>
+ <tr>
+ <td>flink-connector-kafka-0.10{{ site.scala_version_suffix }}</td>
+ <td>1.2.0</td>
+ <td>FlinkKafkaConsumer010<br>
+ FlinkKafkaProducer010</td>
+ <td>0.10.x</td>
+ <td>This connector supports <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message">Kafka messages with timestamps</a> both for producing and consuming.</td>
+ </tr>
</tbody>
</table>
@@ -87,7 +87,6 @@ Note that the streaming connectors are currently not part of the binary distribu
### Installing Apache Kafka
* Follow the instructions from [Kafka's quickstart](https://kafka.apache.org/documentation.html#quickstart) to download the code and launch a server (launching a Zookeeper and a Kafka server is required every time before starting the application).
-* On 32 bit computers [this](http://stackoverflow.com/questions/22325364/unrecognized-vm-option-usecompressedoops-when-running-kafka-from-my-ubuntu-in) problem may occur.
* If the Kafka and Zookeeper servers are running on a remote machine, then the `advertised.host.name` setting in the `config/server.properties` file must be set to the machine's IP address.
### Kafka Consumer
@@ -256,17 +255,28 @@ records to partitions.
Example:
+
<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
+<div data-lang="java, Kafka 0.8+" markdown="1">
{% highlight java %}
stream.addSink(new FlinkKafkaProducer08<String>("localhost:9092", "my-topic", new SimpleStringSchema()));
{% endhighlight %}
</div>
-<div data-lang="scala" markdown="1">
+<div data-lang="java, Kafka 0.10+" markdown="1">
+{% highlight java %}
+FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, "my-topic", new SimpleStringSchema(), properties);
+{% endhighlight %}
+</div>
+<div data-lang="scala, Kafka 0.8+" markdown="1">
{% highlight scala %}
stream.addSink(new FlinkKafkaProducer08[String]("localhost:9092", "my-topic", new SimpleStringSchema()))
{% endhighlight %}
</div>
+<div data-lang="scala, Kafka 0.10+" markdown="1">
+{% highlight scala %}
+FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, "my-topic", new SimpleStringSchema(), properties);
+{% endhighlight %}
+</div>
</div>
You can also define a custom Kafka producer configuration for the KafkaSink with the constructor. Please refer to
@@ -287,3 +297,36 @@ higher value.
There is currently no transactional producer for Kafka, so Flink can not guarantee exactly-once delivery
into a Kafka topic.
+
+### Using Kafka timestamps and Flink event time in Kafka 0.10
+
+Since Apache Kafka 0.10., Kafka's messages can carry [timestamps](https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message), indicating
+the time the event has occurred (see ["event time" in Apache Flink](../event_time.html)) or the time when the message
+has been written to the Kafka broker.
+
+The `FlinkKafkaConsumer010` will emit records with the timestamp attached, if the time characteristic in Flink is
+set to `TimeCharacteristic.EventTime` (`StreamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)`).
+
+The Kafka consumer does not emit watermarks. To emit watermarks, the same mechanisms as described above in
+"Kafka Consumers and Timestamp Extraction/Watermark Emission" using the `assignTimestampsAndWatermarks` method are applicable.
+
+There is no need to define a timestamp extractor when using the timestamps from Kafka. The `previousElementTimestamp` argument of
+the `extractTimestamp()` method contains the timestamp carried by the Kafka message.
+
+A timestamp extractor for a Kafka consumer would look like this:
+{% highlight java %}
+public long extractTimestamp(Long element, long previousElementTimestamp) {
+ return previousElementTimestamp;
+}
+{% endhighlight %}
+
+
+
+The `FlinkKafkaProducer010` only emits the record timestamp, if `setWriteTimestampToKafka(true)` is set.
+
+{% highlight java %}
+FlinkKafkaProducer010.FlinkKafkaProducer010Configuration config = FlinkKafkaProducer010.writeToKafkaWithTimestamps(streamWithTimestamps, topic, new SimpleStringSchema(), standardProps);
+config.setWriteTimestampToKafka(true);
+{% endhighlight %}
+
+
http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/docs/page/js/flink.js
----------------------------------------------------------------------
diff --git a/docs/page/js/flink.js b/docs/page/js/flink.js
index fdf972c..885a8ff 100644
--- a/docs/page/js/flink.js
+++ b/docs/page/js/flink.js
@@ -42,6 +42,7 @@ function codeTabs() {
var image = $(this).data("image");
var notabs = $(this).data("notabs");
var capitalizedLang = lang.substr(0, 1).toUpperCase() + lang.substr(1);
+ lang = lang.replace(/[^a-zA-Z0-9]/g, "_");
var id = "tab_" + lang + "_" + counter;
$(this).attr("id", id);
if (image != null && langImages[lang]) {
@@ -99,9 +100,7 @@ function viewSolution() {
// A script to fix internal hash links because we have an overlapping top bar.
// Based on https://github.com/twitter/bootstrap/issues/193#issuecomment-2281510
function maybeScrollToHash() {
- console.log("HERE");
if (window.location.hash && $(window.location.hash).length) {
- console.log("HERE2", $(window.location.hash), $(window.location.hash).offset().top);
var newTop = $(window.location.hash).offset().top - 57;
$(window).scrollTop(newTop);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml b/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml
index f2bcb11..0b426b5 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml
+++ b/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml
@@ -26,7 +26,7 @@ under the License.
<parent>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-connectors</artifactId>
- <version>1.1-SNAPSHOT</version>
+ <version>1.2-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
@@ -37,7 +37,7 @@ under the License.
<!-- Allow users to pass custom connector versions -->
<properties>
- <kafka.version>0.10.0.0</kafka.version>
+ <kafka.version>0.10.0.1</kafka.version>
</properties>
<dependencies>
@@ -46,21 +46,16 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_2.10</artifactId>
+ <artifactId>flink-connector-kafka-0.9_2.10</artifactId>
<version>${project.version}</version>
- <scope>provided</scope>
</dependency>
+ <!-- Add Kafka 0.10.x as a dependency -->
+
<dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-kafka-base_2.10</artifactId>
- <version>${project.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_${scala.binary.version}</artifactId>
- </exclusion>
- </exclusions>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>${kafka.version}</version>
</dependency>
<dependency>
@@ -73,20 +68,29 @@ under the License.
<optional>true</optional>
</dependency>
+ <!-- test dependencies -->
+
<dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- <version>${kafka.version}</version>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-kafka-0.9_2.10</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <!-- exclude Kafka dependencies -->
+ <exclusion>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_${scala.binary.version}</artifactId>
+ </exclusion>
+ </exclusions>
+ <type>test-jar</type>
+ <scope>test</scope>
</dependency>
- <!-- test dependencies -->
-
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-base_2.10</artifactId>
<version>${project.version}</version>
<exclusions>
- <!-- exclude 0.8 dependencies -->
+ <!-- exclude Kafka dependencies -->
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.binary.version}</artifactId>
@@ -127,6 +131,14 @@ under the License.
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-metrics-jmx</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
index 78ccd4a..267ff57 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
@@ -28,20 +28,10 @@ import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
import org.apache.flink.util.SerializedValue;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
-import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* The Flink Kafka Consumer is a streaming data source that pulls a parallel data stream from
@@ -64,30 +54,10 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* is constructed. That means that the client that submits the program needs to be able to
* reach the Kafka brokers or ZooKeeper.</p>
*/
-public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumerBase<T> {
+public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumer09<T> {
private static final long serialVersionUID = 2324564345203409112L;
- private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumer010.class);
-
- /** Configuration key to change the polling timeout **/
- public static final String KEY_POLL_TIMEOUT = "flink.poll-timeout";
-
- /** Boolean configuration key to disable metrics tracking **/
- public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
-
- /** From Kafka's Javadoc: The time, in milliseconds, spent waiting in poll if data is not
- * available. If 0, returns immediately with any records that are available now. */
- public static final long DEFAULT_POLL_TIMEOUT = 100L;
-
- // ------------------------------------------------------------------------
-
- /** User-supplied properties for Kafka **/
- private final Properties properties;
-
- /** From Kafka's Javadoc: The time, in milliseconds, spent waiting in poll if data is not
- * available. If 0, returns immediately with any records that are available now */
- private final long pollTimeout;
// ------------------------------------------------------------------------
@@ -151,51 +121,7 @@ public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumerBase<T> {
* The properties that are used to configure both the fetcher and the offset handler.
*/
public FlinkKafkaConsumer010(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props) {
- super(deserializer);
-
- checkNotNull(topics, "topics");
- this.properties = checkNotNull(props, "props");
- setDeserializer(this.properties);
-
- // configure the polling timeout
- try {
- if (properties.containsKey(KEY_POLL_TIMEOUT)) {
- this.pollTimeout = Long.parseLong(properties.getProperty(KEY_POLL_TIMEOUT));
- } else {
- this.pollTimeout = DEFAULT_POLL_TIMEOUT;
- }
- }
- catch (Exception e) {
- throw new IllegalArgumentException("Cannot parse poll timeout for '" + KEY_POLL_TIMEOUT + '\'', e);
- }
-
- // read the partitions that belong to the listed topics
- final List<KafkaTopicPartition> partitions = new ArrayList<>();
-
- try (KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(this.properties)) {
- for (final String topic: topics) {
- // get partitions for each topic
- List<PartitionInfo> partitionsForTopic = consumer.partitionsFor(topic);
- // for non existing topics, the list might be null.
- if (partitionsForTopic != null) {
- partitions.addAll(convertToFlinkKafkaTopicPartition(partitionsForTopic));
- }
- }
- }
-
- if (partitions.isEmpty()) {
- throw new RuntimeException("Unable to retrieve any partitions for the requested topics " + topics);
- }
-
- // we now have a list of partitions which is the same for all parallel consumer instances.
- LOG.info("Got {} partitions from these topics: {}", partitions.size(), topics);
-
- if (LOG.isInfoEnabled()) {
- logPartitionInfo(LOG, partitions);
- }
-
- // register these partitions
- setSubscribedPartitions(partitions);
+ super(topics, deserializer, props);
}
@Override
@@ -212,48 +138,5 @@ public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumerBase<T> {
watermarksPeriodic, watermarksPunctuated,
runtimeContext, deserializer,
properties, pollTimeout, useMetrics);
-
- }
-
- // ------------------------------------------------------------------------
- // Utilities
- // ------------------------------------------------------------------------
-
- /**
- * Converts a list of Kafka PartitionInfo's to Flink's KafkaTopicPartition (which are serializable)
- *
- * @param partitions A list of Kafka PartitionInfos.
- * @return A list of KafkaTopicPartitions
- */
- private static List<KafkaTopicPartition> convertToFlinkKafkaTopicPartition(List<PartitionInfo> partitions) {
- checkNotNull(partitions);
-
- List<KafkaTopicPartition> ret = new ArrayList<>(partitions.size());
- for (PartitionInfo pi : partitions) {
- ret.add(new KafkaTopicPartition(pi.topic(), pi.partition()));
- }
- return ret;
- }
-
- /**
- * Makes sure that the ByteArrayDeserializer is registered in the Kafka properties.
- *
- * @param props The Kafka properties to register the serializer in.
- */
- private static void setDeserializer(Properties props) {
- final String deSerName = ByteArrayDeserializer.class.getCanonicalName();
-
- Object keyDeSer = props.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
- Object valDeSer = props.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
-
- if (keyDeSer != null && !keyDeSer.equals(deSerName)) {
- LOG.warn("Ignoring configured key DeSerializer ({})", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
- }
- if (valDeSer != null && !valDeSer.equals(deSerName)) {
- LOG.warn("Ignoring configured value DeSerializer ({})", ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
- }
-
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deSerName);
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deSerName);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
index 49bce39..cc0194b 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
@@ -17,27 +17,123 @@
package org.apache.flink.streaming.connectors.kafka;
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
+import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.getPropertiesFromBrokerList;
+
/**
- * Flink Sink to produce data into a Kafka topic. This producer is compatible with Kafka 0.8.
+ * Flink Sink to produce data into a Kafka topic. This producer is compatible with Kafka 0.10.x
+ *
+ * Implementation note: This producer is a hybrid between a regular regular sink function (a)
+ * and a custom operator (b).
+ *
+ * For (a), the class implements the SinkFunction and RichFunction interfaces.
+ * For (b), it extends the StreamTask class.
+ *
+ * Details about approach (a):
*
- * Please note that this producer does not have any reliability guarantees.
+ * Pre Kafka 0.10 producers only follow approach (a), allowing users to use the producer using the
+ * DataStream.addSink() method.
+ * Since the APIs exposed in that variant do not allow accessing the the timestamp attached to the record
+ * the Kafka 0.10 producer has a second invocation option, approach (b).
*
- * @param <IN> Type of the messages to write into Kafka.
+ * Details about approach (b):
+ * Kafka 0.10 supports writing the timestamp attached to a record to Kafka. When adding the
+ * FlinkKafkaProducer010 using the FlinkKafkaProducer010.writeToKafkaWithTimestamps() method, the Kafka producer
+ * can access the internal record timestamp of the record and write it to Kafka.
+ *
+ * All methods and constructors in this class are marked with the approach they are needed for.
*/
-public class FlinkKafkaProducer010<IN> extends FlinkKafkaProducerBase<IN> {
+public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunction<T>, RichFunction {
+
+ /**
+ * Flag controlling whether we are writing the Flink record's timestamp into Kafka.
+ */
+ private boolean writeTimestampToKafka = false;
+
+ // ---------------------- "Constructors" for timestamp writing ------------------
+
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
+ * the topic.
+ *
+ * This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
+ *
+ * @param inStream The stream to write to Kafka
+ * @param topicId ID of the Kafka topic.
+ * @param serializationSchema User defined serialization schema supporting key/value messages
+ * @param producerConfig Properties with the producer configuration.
+ */
+ public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream,
+ String topicId,
+ KeyedSerializationSchema<T> serializationSchema,
+ Properties producerConfig) {
+ return writeToKafkaWithTimestamps(inStream, topicId, serializationSchema, producerConfig, new FixedPartitioner<T>());
+ }
+
+
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
+ * the topic.
+ *
+ * This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
+ *
+ * @param inStream The stream to write to Kafka
+ * @param topicId ID of the Kafka topic.
+ * @param serializationSchema User defined (keyless) serialization schema.
+ * @param producerConfig Properties with the producer configuration.
+ */
+ public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream,
+ String topicId,
+ SerializationSchema<T> serializationSchema,
+ Properties producerConfig) {
+ return writeToKafkaWithTimestamps(inStream, topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FixedPartitioner<T>());
+ }
+
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
+ * the topic.
+ *
+ * This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
+ *
+ * @param inStream The stream to write to Kafka
+ * @param topicId The name of the target topic
+ * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
+ * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
+ * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
+ */
+ public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream,
+ String topicId,
+ KeyedSerializationSchema<T> serializationSchema,
+ Properties producerConfig,
+ KafkaPartitioner<T> customPartitioner) {
- private static final long serialVersionUID = 1L;
+ GenericTypeInfo<Object> objectTypeInfo = new GenericTypeInfo<>(Object.class);
+ FlinkKafkaProducer010<T> kafkaProducer = new FlinkKafkaProducer010<>(topicId, serializationSchema, producerConfig, customPartitioner);
+ SingleOutputStreamOperator<Object> transformation = inStream.transform("FlinKafkaProducer 0.10.x", objectTypeInfo, kafkaProducer);
+ return new FlinkKafkaProducer010Configuration<>(transformation, kafkaProducer);
+ }
- // ------------------- Keyless serialization schema constructors ----------------------
+ // ---------------------- Regular constructors w/o timestamp support ------------------
/**
* Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
@@ -50,8 +146,8 @@ public class FlinkKafkaProducer010<IN> extends FlinkKafkaProducerBase<IN> {
* @param serializationSchema
* User defined (keyless) serialization schema.
*/
- public FlinkKafkaProducer010(String brokerList, String topicId, SerializationSchema<IN> serializationSchema) {
- this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), new FixedPartitioner<IN>());
+ public FlinkKafkaProducer010(String brokerList, String topicId, SerializationSchema<T> serializationSchema) {
+ this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), new FixedPartitioner<T>());
}
/**
@@ -65,8 +161,8 @@ public class FlinkKafkaProducer010<IN> extends FlinkKafkaProducerBase<IN> {
* @param producerConfig
* Properties with the producer configuration.
*/
- public FlinkKafkaProducer010(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig) {
- this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FixedPartitioner<IN>());
+ public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig) {
+ this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FixedPartitioner<T>());
}
/**
@@ -78,9 +174,8 @@ public class FlinkKafkaProducer010<IN> extends FlinkKafkaProducerBase<IN> {
* @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
* @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
*/
- public FlinkKafkaProducer010(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
+ public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig, KafkaPartitioner<T> customPartitioner) {
this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
-
}
// ------------------- Key/Value serialization schema constructors ----------------------
@@ -96,8 +191,8 @@ public class FlinkKafkaProducer010<IN> extends FlinkKafkaProducerBase<IN> {
* @param serializationSchema
* User defined serialization schema supporting key/value messages
*/
- public FlinkKafkaProducer010(String brokerList, String topicId, KeyedSerializationSchema<IN> serializationSchema) {
- this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), new FixedPartitioner<IN>());
+ public FlinkKafkaProducer010(String brokerList, String topicId, KeyedSerializationSchema<T> serializationSchema) {
+ this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), new FixedPartitioner<T>());
}
/**
@@ -111,27 +206,193 @@ public class FlinkKafkaProducer010<IN> extends FlinkKafkaProducerBase<IN> {
* @param producerConfig
* Properties with the producer configuration.
*/
- public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig) {
- this(topicId, serializationSchema, producerConfig, new FixedPartitioner<IN>());
+ public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig) {
+ this(topicId, serializationSchema, producerConfig, new FixedPartitioner<T>());
}
/**
- * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
- * the topic.
+ * Create Kafka producer
*
- * @param topicId The topic to write data to
- * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
- * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
- * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
+ * This constructor does not allow writing timestamps to Kafka, it follow approach (a) (see above)
*/
- public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
- super(topicId, serializationSchema, producerConfig, customPartitioner);
+ public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, KafkaPartitioner<T> customPartitioner) {
+ // We create a Kafka 09 producer instance here and only "override" (by intercepting) the
+ // invoke call.
+ super(new FlinkKafkaProducer09<>(topicId, serializationSchema, producerConfig, customPartitioner));
+ }
+
+
+ // ----------------------------- Generic element processing ---------------------------
+
+ private void invokeInternal(T next, long elementTimestamp) throws Exception {
+
+ final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction;
+
+ internalProducer.checkErroneous();
+
+ byte[] serializedKey = internalProducer.schema.serializeKey(next);
+ byte[] serializedValue = internalProducer.schema.serializeValue(next);
+ String targetTopic = internalProducer.schema.getTargetTopic(next);
+ if (targetTopic == null) {
+ targetTopic = internalProducer.defaultTopicId;
+ }
+
+ Long timestamp = null;
+ if(this.writeTimestampToKafka) {
+ timestamp = elementTimestamp;
+ }
+
+ ProducerRecord<byte[], byte[]> record;
+ if (internalProducer.partitioner == null) {
+ record = new ProducerRecord<>(targetTopic, null, timestamp, serializedKey, serializedValue);
+ } else {
+ record = new ProducerRecord<>(targetTopic, internalProducer.partitioner.partition(next, serializedKey, serializedValue, internalProducer.partitions.length), timestamp, serializedKey, serializedValue);
+ }
+ if (internalProducer.flushOnCheckpoint) {
+ synchronized (internalProducer.pendingRecordsLock) {
+ internalProducer.pendingRecords++;
+ }
+ }
+ internalProducer.producer.send(record, internalProducer.callback);
}
+
+ // ----------------- Helper methods implementing methods from SinkFunction and RichFunction (Approach (a)) ----
+
+
+ // ---- Configuration setters
+
+ /**
+ * Defines whether the producer should fail on errors, or only log them.
+ * If this is set to true, then exceptions will be only logged, if set to false,
+ * exceptions will be eventually thrown and cause the streaming program to
+ * fail (and enter recovery).
+ *
+ * Method is only accessible for approach (a) (see above)
+ *
+ * @param logFailuresOnly The flag to indicate logging-only on exceptions.
+ */
+ public void setLogFailuresOnly(boolean logFailuresOnly) {
+ final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction;
+ internalProducer.setLogFailuresOnly(logFailuresOnly);
+ }
+
+ /**
+ * If set to true, the Flink producer will wait for all outstanding messages in the Kafka buffers
+ * to be acknowledged by the Kafka producer on a checkpoint.
+ * This way, the producer can guarantee that messages in the Kafka buffers are part of the checkpoint.
+ *
+ * Method is only accessible for approach (a) (see above)
+ *
+ * @param flush Flag indicating the flushing mode (true = flush on checkpoint)
+ */
+ public void setFlushOnCheckpoint(boolean flush) {
+ final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction;
+ internalProducer.setFlushOnCheckpoint(flush);
+ }
+
+ /**
+ * This method is used for approach (a) (see above)
+ *
+ */
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction;
+ internalProducer.open(parameters);
+ }
+
+ /**
+ * This method is used for approach (a) (see above)
+ */
+ @Override
+ public IterationRuntimeContext getIterationRuntimeContext() {
+ final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction;
+ return internalProducer.getIterationRuntimeContext();
+ }
+
+ /**
+ * This method is used for approach (a) (see above)
+ */
@Override
- protected void flush() {
- if (this.producer != null) {
- producer.flush();
+ public void setRuntimeContext(RuntimeContext t) {
+ final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction;
+ internalProducer.setRuntimeContext(t);
+ }
+
+ /**
+ * Invoke method for using the Sink as DataStream.addSink() sink.
+ *
+ * This method is used for approach (a) (see above)
+ *
+ * @param value The input record.
+ */
+ @Override
+ public void invoke(T value) throws Exception {
+ invokeInternal(value, Long.MAX_VALUE);
+ }
+
+
+ // ----------------- Helper methods and classes implementing methods from StreamSink (Approach (b)) ----
+
+
+ /**
+ * Process method for using the sink with timestamp support.
+ *
+ * This method is used for approach (b) (see above)
+ */
+ @Override
+ public void processElement(StreamRecord<T> element) throws Exception {
+ invokeInternal(element.getValue(), element.getTimestamp());
+ }
+
+ /**
+ * Configuration object returned by the writeToKafkaWithTimestamps() call.
+ */
+ public static class FlinkKafkaProducer010Configuration<T> extends DataStreamSink<T> {
+
+ private final FlinkKafkaProducerBase wrappedProducerBase;
+ private final FlinkKafkaProducer010 producer;
+
+ private FlinkKafkaProducer010Configuration(DataStream stream, FlinkKafkaProducer010<T> producer) {
+ //noinspection unchecked
+ super(stream, producer);
+ this.producer = producer;
+ this.wrappedProducerBase = (FlinkKafkaProducerBase) producer.userFunction;
+ }
+
+ /**
+ * Defines whether the producer should fail on errors, or only log them.
+ * If this is set to true, then exceptions will be only logged, if set to false,
+ * exceptions will be eventually thrown and cause the streaming program to
+ * fail (and enter recovery).
+ *
+ * @param logFailuresOnly The flag to indicate logging-only on exceptions.
+ */
+ public void setLogFailuresOnly(boolean logFailuresOnly) {
+ this.wrappedProducerBase.setLogFailuresOnly(logFailuresOnly);
+ }
+
+ /**
+ * If set to true, the Flink producer will wait for all outstanding messages in the Kafka buffers
+ * to be acknowledged by the Kafka producer on a checkpoint.
+ * This way, the producer can guarantee that messages in the Kafka buffers are part of the checkpoint.
+ *
+ * @param flush Flag indicating the flushing mode (true = flush on checkpoint)
+ */
+ public void setFlushOnCheckpoint(boolean flush) {
+ this.wrappedProducerBase.setFlushOnCheckpoint(flush);
+ }
+
+ /**
+ * If set to true, Flink will write the (event time) timestamp attached to each record into Kafka.
+ * Timestamps must be positive for Kafka to accept them.
+ *
+ * @param writeTimestampToKafka Flag indicating if Flink's internal timestamps are written to Kafka.
+ */
+ public void setWriteTimestampToKafka(boolean writeTimestampToKafka) {
+ this.producer.writeTimestampToKafka = writeTimestampToKafka;
}
}
+
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
index cda68ce..ddf1ad3 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
@@ -28,7 +28,7 @@ import java.util.Properties;
/**
* Kafka {@link StreamTableSource} for Kafka 0.10.
*/
-public class Kafka010JsonTableSource extends KafkaJsonTableSource {
+public class Kafka010JsonTableSource extends Kafka09JsonTableSource {
/**
* Creates a Kafka 0.10 JSON {@link StreamTableSource}.
http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
index cee1b90..732440b 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
@@ -28,7 +28,7 @@ import java.util.Properties;
/**
* Kafka {@link StreamTableSource} for Kafka 0.10.
*/
-public class Kafka010TableSource extends KafkaTableSource {
+public class Kafka010TableSource extends Kafka09TableSource {
/**
* Creates a Kafka 0.10 {@link StreamTableSource}.
http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
index 70f530b..47bee22 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
@@ -18,37 +18,20 @@
package org.apache.flink.streaming.connectors.kafka.internal;
-import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
-import org.apache.flink.streaming.connectors.kafka.internals.ExceptionProxy;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
-import org.apache.flink.streaming.connectors.kafka.internals.metrics.DefaultKafkaMetricAccumulator;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.util.SerializedValue;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.common.Metric;
-import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.WakeupException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.Properties;
/**
@@ -56,40 +39,7 @@ import java.util.Properties;
*
* @param <T> The type of elements produced by the fetcher.
*/
-public class Kafka010Fetcher<T> extends AbstractFetcher<T, TopicPartition> implements Runnable {
-
- private static final Logger LOG = LoggerFactory.getLogger(Kafka010Fetcher.class);
-
- // ------------------------------------------------------------------------
-
- /** The schema to convert between Kafka's byte messages, and Flink's objects */
- private final KeyedDeserializationSchema<T> deserializer;
-
- /** The subtask's runtime context */
- private final RuntimeContext runtimeContext;
-
- /** The configuration for the Kafka consumer */
- private final Properties kafkaProperties;
-
- /** The maximum number of milliseconds to wait for a fetch batch */
- private final long pollTimeout;
-
- /** Flag whether to register Kafka metrics as Flink accumulators */
- private final boolean forwardKafkaMetrics;
-
- /** Mutex to guard against concurrent access to the non-threadsafe Kafka consumer */
- private final Object consumerLock = new Object();
-
- /** Reference to the Kafka consumer, once it is created */
- private volatile KafkaConsumer<byte[], byte[]> consumer;
-
- /** Reference to the proxy, forwarding exceptions from the fetch thread to the main thread */
- private volatile ExceptionProxy errorHandler;
-
- /** Flag to mark the main work loop as alive */
- private volatile boolean running = true;
-
- // ------------------------------------------------------------------------
+public class Kafka010Fetcher<T> extends Kafka09Fetcher<T> {
public Kafka010Fetcher(
SourceContext<T> sourceContext,
@@ -100,213 +50,47 @@ public class Kafka010Fetcher<T> extends AbstractFetcher<T, TopicPartition> imple
KeyedDeserializationSchema<T> deserializer,
Properties kafkaProperties,
long pollTimeout,
- boolean forwardKafkaMetrics) throws Exception
+ boolean useMetrics) throws Exception
{
- super(sourceContext, assignedPartitions, watermarksPeriodic, watermarksPunctuated, runtimeContext);
-
- this.deserializer = deserializer;
- this.runtimeContext = runtimeContext;
- this.kafkaProperties = kafkaProperties;
- this.pollTimeout = pollTimeout;
- this.forwardKafkaMetrics = forwardKafkaMetrics;
-
- // if checkpointing is enabled, we are not automatically committing to Kafka.
- kafkaProperties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
- Boolean.toString(!runtimeContext.isCheckpointingEnabled()));
+ super(sourceContext, assignedPartitions, watermarksPeriodic, watermarksPunctuated, runtimeContext, deserializer, kafkaProperties, pollTimeout, useMetrics);
}
- // ------------------------------------------------------------------------
- // Fetcher work methods
- // ------------------------------------------------------------------------
-
@Override
- public void runFetchLoop() throws Exception {
- this.errorHandler = new ExceptionProxy(Thread.currentThread());
-
- // rather than running the main fetch loop directly here, we spawn a dedicated thread
- // this makes sure that no interrupt() call upon canceling reaches the Kafka consumer code
- Thread runner = new Thread(this, "Kafka 0.10 Fetcher for " + runtimeContext.getTaskNameWithSubtasks());
- runner.setDaemon(true);
- runner.start();
-
- try {
- runner.join();
- } catch (InterruptedException e) {
- // may be the result of a wake-up after an exception. we ignore this here and only
- // restore the interruption state
- Thread.currentThread().interrupt();
- }
-
- // make sure we propagate any exception that occurred in the concurrent fetch thread,
- // before leaving this method
- this.errorHandler.checkAndThrowException();
+ protected void assignPartitionsToConsumer(KafkaConsumer<byte[], byte[]> consumer, List<TopicPartition> topicPartitions) {
+ consumer.assign(topicPartitions);
}
@Override
- public void cancel() {
- // flag the main thread to exit
- running = false;
-
- // NOTE:
- // - We cannot interrupt the runner thread, because the Kafka consumer may
- // deadlock when the thread is interrupted while in certain methods
- // - We cannot call close() on the consumer, because it will actually throw
- // an exception if a concurrent call is in progress
-
- // make sure the consumer finds out faster that we are shutting down
- if (consumer != null) {
- consumer.wakeup();
- }
+ protected void emitRecord(T record, KafkaTopicPartitionState<TopicPartition> partition, long offset, ConsumerRecord consumerRecord) throws Exception {
+ // get timestamp from provided ConsumerRecord (only possible with kafka 0.10.x)
+ super.emitRecord(record, partition, offset, consumerRecord.timestamp());
}
+ /**
+ * Emit record Kafka-timestamp aware.
+ */
@Override
- public void run() {
- // This method initializes the KafkaConsumer and guarantees it is torn down properly.
- // This is important, because the consumer has multi-threading issues,
- // including concurrent 'close()' calls.
-
- final KafkaConsumer<byte[], byte[]> consumer;
- try {
- consumer = new KafkaConsumer<>(kafkaProperties);
- }
- catch (Throwable t) {
- running = false;
- errorHandler.reportError(t);
- return;
- }
-
- // from here on, the consumer will be closed properly
- try {
- consumer.assign(convertKafkaPartitions(subscribedPartitions()));
-
- // register Kafka metrics to Flink accumulators
- if (forwardKafkaMetrics) {
- Map<MetricName, ? extends Metric> metrics = consumer.metrics();
- if (metrics == null) {
- // MapR's Kafka implementation returns null here.
- LOG.info("Consumer implementation does not support metrics");
- } else {
- // we have metrics, register them where possible
- for (Map.Entry<MetricName, ? extends Metric> metric : metrics.entrySet()) {
- String name = "KafkaConsumer-" + metric.getKey().name();
- DefaultKafkaMetricAccumulator kafkaAccumulator =
- DefaultKafkaMetricAccumulator.createFor(metric.getValue());
-
- // best effort: we only add the accumulator if available.
- if (kafkaAccumulator != null) {
- runtimeContext.addAccumulator(name, kafkaAccumulator);
- }
- }
- }
- }
-
- // seek the consumer to the initial offsets
- for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) {
- if (partition.isOffsetDefined()) {
- consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1);
- }
- }
-
- // from now on, external operations may call the consumer
- this.consumer = consumer;
-
- // main fetch loop
- while (running) {
- // get the next batch of records
- final ConsumerRecords<byte[], byte[]> records;
- synchronized (consumerLock) {
- try {
- records = consumer.poll(pollTimeout);
- }
- catch (WakeupException we) {
- if (running) {
- throw we;
- } else {
- continue;
- }
- }
- }
-
- // get the records for each topic partition
- for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) {
-
- List<ConsumerRecord<byte[], byte[]>> partitionRecords = records.records(partition.getKafkaPartitionHandle());
-
- for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
- T value = deserializer.deserialize(
- record.key(), record.value(),
- record.topic(), record.partition(), record.offset());
-
- if (deserializer.isEndOfStream(value)) {
- // end of stream signaled
- running = false;
- break;
- }
-
- // emit the actual record. this also update offset state atomically
- // and deals with timestamps and watermark generation
- emitRecord(value, partition, record.offset());
- }
- }
+ protected void emitRecord(T record, KafkaTopicPartitionState<TopicPartition> partitionState, long offset, long timestamp) throws Exception {
+ if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) {
+ // fast path logic, in case there are no watermarks
+
+ // emit the record, using the checkpoint lock to guarantee
+ // atomicity of record emission and offset state update
+ synchronized (checkpointLock) {
+ sourceContext.collectWithTimestamp(record, timestamp);
+ partitionState.setOffset(offset);
}
- // end main fetch loop
}
- catch (Throwable t) {
- if (running) {
- running = false;
- errorHandler.reportError(t);
- } else {
- LOG.debug("Stopped ConsumerThread threw exception", t);
- }
+ else if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
+ emitRecordWithTimestampAndPeriodicWatermark(record, partitionState, offset, timestamp);
}
- finally {
- try {
- synchronized (consumerLock) {
- consumer.close();
- }
- } catch (Throwable t) {
- LOG.warn("Error while closing Kafka 0.10 consumer", t);
- }
+ else {
+ emitRecordWithTimestampAndPunctuatedWatermark(record, partitionState, offset, timestamp);
}
}
- // ------------------------------------------------------------------------
- // Kafka 0.10 specific fetcher behavior
- // ------------------------------------------------------------------------
-
@Override
- public TopicPartition createKafkaPartitionHandle(KafkaTopicPartition partition) {
- return new TopicPartition(partition.getTopic(), partition.getPartition());
- }
-
- @Override
- public void commitSpecificOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception {
- KafkaTopicPartitionState<TopicPartition>[] partitions = subscribedPartitions();
- Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>(partitions.length);
-
- for (KafkaTopicPartitionState<TopicPartition> partition : partitions) {
- Long offset = offsets.get(partition.getKafkaTopicPartition());
- if (offset != null) {
- offsetsToCommit.put(partition.getKafkaPartitionHandle(), new OffsetAndMetadata(offset, ""));
- }
- }
-
- if (this.consumer != null) {
- synchronized (consumerLock) {
- this.consumer.commitSync(offsetsToCommit);
- }
- }
- }
-
- // ------------------------------------------------------------------------
- // Utilities
- // ------------------------------------------------------------------------
-
- public static Collection<TopicPartition> convertKafkaPartitions(KafkaTopicPartitionState<TopicPartition>[] partitions) {
- ArrayList<TopicPartition> result = new ArrayList<>(partitions.length);
- for (KafkaTopicPartitionState<TopicPartition> p : partitions) {
- result.add(p.getKafkaPartitionHandle());
- }
- return result;
+ protected String getFetcherName() {
+ return "Kafka 0.10 Fetcher";
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
index 5427853..28bf6d5 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
@@ -17,14 +17,32 @@
package org.apache.flink.streaming.connectors.kafka;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
import org.junit.Test;
-import java.util.UUID;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import javax.annotation.Nullable;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
public class Kafka010ITCase extends KafkaConsumerTestBase {
@@ -33,10 +51,6 @@ public class Kafka010ITCase extends KafkaConsumerTestBase {
// Suite of Tests
// ------------------------------------------------------------------------
- @Override
- public String getExpectedKafkaVersion() {
- return "0.10";
- }
@Test(timeout = 60000)
public void testFailOnNoBroker() throws Exception {
@@ -48,16 +62,6 @@ public class Kafka010ITCase extends KafkaConsumerTestBase {
runSimpleConcurrentProducerConsumerTopology();
}
-// @Test(timeout = 60000)
-// public void testPunctuatedExplicitWMConsumer() throws Exception {
-// runExplicitPunctuatedWMgeneratingConsumerTest(false);
-// }
-
-// @Test(timeout = 60000)
-// public void testPunctuatedExplicitWMConsumerWithEmptyTopic() throws Exception {
-// runExplicitPunctuatedWMgeneratingConsumerTest(true);
-// }
-
@Test(timeout = 60000)
public void testKeyValueSupport() throws Exception {
runKeyValueTest();
@@ -124,68 +128,168 @@ public class Kafka010ITCase extends KafkaConsumerTestBase {
@Test(timeout = 60000)
public void testMetricsAndEndOfStream() throws Exception {
- runMetricsAndEndOfStreamTest();
- }
-
- @Test
- public void testJsonTableSource() throws Exception {
- String topic = UUID.randomUUID().toString();
-
- // Names and types are determined in the actual test method of the
- // base test class.
- Kafka010JsonTableSource tableSource = new Kafka010JsonTableSource(
- topic,
- standardProps,
- new String[] {
- "long",
- "string",
- "boolean",
- "double",
- "missing-field"},
- new TypeInformation<?>[] {
- BasicTypeInfo.LONG_TYPE_INFO,
- BasicTypeInfo.STRING_TYPE_INFO,
- BasicTypeInfo.BOOLEAN_TYPE_INFO,
- BasicTypeInfo.DOUBLE_TYPE_INFO,
- BasicTypeInfo.LONG_TYPE_INFO });
-
- // Don't fail on missing field, but set to null (default)
- tableSource.setFailOnMissingField(false);
-
- runJsonTableSource(topic, tableSource);
- }
-
- @Test
- public void testJsonTableSourceWithFailOnMissingField() throws Exception {
- String topic = UUID.randomUUID().toString();
-
- // Names and types are determined in the actual test method of the
- // base test class.
- Kafka010JsonTableSource tableSource = new Kafka010JsonTableSource(
- topic,
- standardProps,
- new String[] {
- "long",
- "string",
- "boolean",
- "double",
- "missing-field"},
- new TypeInformation<?>[] {
- BasicTypeInfo.LONG_TYPE_INFO,
- BasicTypeInfo.STRING_TYPE_INFO,
- BasicTypeInfo.BOOLEAN_TYPE_INFO,
- BasicTypeInfo.DOUBLE_TYPE_INFO,
- BasicTypeInfo.LONG_TYPE_INFO });
-
- // Don't fail on missing field, but set to null (default)
- tableSource.setFailOnMissingField(true);
-
- try {
- runJsonTableSource(topic, tableSource);
- fail("Did not throw expected Exception");
- } catch (Exception e) {
- Throwable rootCause = e.getCause().getCause().getCause();
- assertTrue("Unexpected root cause", rootCause instanceof IllegalStateException);
+ runEndOfStreamTest();
+ }
+
+ /**
+ * Kafka 0.10 specific test, ensuring Timestamps are properly written to and read from Kafka
+ */
+ @Test(timeout = 60000)
+ public void testTimestamps() throws Exception {
+
+ final String topic = "tstopic";
+ createTestTopic(topic, 3, 1);
+
+ // ---------- Produce an event time stream into Kafka -------------------
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+ env.setParallelism(1);
+ env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+ env.getConfig().disableSysoutLogging();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+ DataStream<Long> streamWithTimestamps = env.addSource(new SourceFunction<Long>() {
+ boolean running = true;
+
+ @Override
+ public void run(SourceContext<Long> ctx) throws Exception {
+ long i = 0;
+ while(running) {
+ ctx.collectWithTimestamp(i, i*2);
+ if(i++ == 1000L) {
+ running = false;
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+ });
+
+ final TypeInformationSerializationSchema<Long> longSer = new TypeInformationSerializationSchema<>(TypeInfoParser.<Long>parse("Long"), env.getConfig());
+ FlinkKafkaProducer010.FlinkKafkaProducer010Configuration prod = FlinkKafkaProducer010.writeToKafkaWithTimestamps(streamWithTimestamps, topic, new KeyedSerializationSchemaWrapper<>(longSer), standardProps, new KafkaPartitioner<Long>() {
+ @Override
+ public int partition(Long next, byte[] serializedKey, byte[] serializedValue, int numPartitions) {
+ return (int)(next % 3);
+ }
+ });
+ prod.setParallelism(3);
+ prod.setWriteTimestampToKafka(true);
+ env.execute("Produce some");
+
+ // ---------- Consume stream from Kafka -------------------
+
+ env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+ env.setParallelism(1);
+ env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+ env.getConfig().disableSysoutLogging();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+ FlinkKafkaConsumer010<Long> kafkaSource = new FlinkKafkaConsumer010<>(topic, new LimitedLongDeserializer(), standardProps);
+ kafkaSource.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Long>() {
+ @Nullable
+ @Override
+ public Watermark checkAndGetNextWatermark(Long lastElement, long extractedTimestamp) {
+ if(lastElement % 10 == 0) {
+ return new Watermark(lastElement);
+ }
+ return null;
+ }
+
+ @Override
+ public long extractTimestamp(Long element, long previousElementTimestamp) {
+ return previousElementTimestamp;
+ }
+ });
+
+ DataStream<Long> stream = env.addSource(kafkaSource);
+ GenericTypeInfo<Object> objectTypeInfo = new GenericTypeInfo<>(Object.class);
+ stream.transform("timestamp validating operator", objectTypeInfo, new TimestampValidatingOperator()).setParallelism(1);
+
+ env.execute("Consume again");
+
+ deleteTestTopic(topic);
+ }
+
+ private static class TimestampValidatingOperator extends StreamSink<Long> {
+
+ public TimestampValidatingOperator() {
+ super(new SinkFunction<Long>() {
+ @Override
+ public void invoke(Long value) throws Exception {
+ throw new RuntimeException("Unexpected");
+ }
+ });
+ }
+
+ long elCount = 0;
+ long wmCount = 0;
+ long lastWM = Long.MIN_VALUE;
+
+ @Override
+ public void processElement(StreamRecord<Long> element) throws Exception {
+ elCount++;
+ if(element.getValue() * 2 != element.getTimestamp()) {
+ throw new RuntimeException("Invalid timestamp: " + element);
+ }
+ }
+
+ @Override
+ public void processWatermark(Watermark mark) throws Exception {
+ wmCount++;
+
+ if(lastWM <= mark.getTimestamp()) {
+ lastWM = mark.getTimestamp();
+ } else {
+ throw new RuntimeException("Received watermark higher than the last one");
+ }
+
+ if( mark.getTimestamp() % 10 != 0 && mark.getTimestamp() != Long.MAX_VALUE ) {
+ throw new RuntimeException("Invalid watermark: " + mark.getTimestamp());
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ if(elCount != 1000L) {
+ throw new RuntimeException("Wrong final element count " + elCount);
+ }
+
+ if(wmCount <= 2) {
+ throw new RuntimeException("Almost no watermarks have been sent " + wmCount);
+ }
+ }
+ }
+
+ private static class LimitedLongDeserializer implements KeyedDeserializationSchema<Long> {
+
+ private final TypeInformation<Long> ti;
+ private final TypeSerializer<Long> ser;
+ long cnt = 0;
+
+ public LimitedLongDeserializer() {
+ this.ti = TypeInfoParser.parse("Long");
+ this.ser = ti.createSerializer(new ExecutionConfig());
+ }
+ @Override
+ public TypeInformation<Long> getProducedType() {
+ return ti;
+ }
+
+ @Override
+ public Long deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
+ cnt++;
+ DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(message));
+ Long e = ser.deserialize(in);
+ return e;
+ }
+
+ @Override
+ public boolean isEndOfStream(Long nextElement) {
+ return cnt > 1000L;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
deleted file mode 100644
index 5f5ac63..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
-import org.apache.flink.util.TestLogger;
-
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.PartitionInfo;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.util.Collections;
-import java.util.Properties;
-import java.util.concurrent.Future;
-
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.anyString;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static org.powermock.api.mockito.PowerMockito.whenNew;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(FlinkKafkaProducerBase.class)
-public class KafkaProducerTest extends TestLogger {
-
- @Test
- @SuppressWarnings("unchecked")
- public void testPropagateExceptions() {
- try {
- // mock kafka producer
- KafkaProducer<?, ?> kafkaProducerMock = mock(KafkaProducer.class);
-
- // partition setup
- when(kafkaProducerMock.partitionsFor(anyString())).thenReturn(
- Collections.singletonList(new PartitionInfo("mock_topic", 42, null, null, null)));
-
- // failure when trying to send an element
- when(kafkaProducerMock.send(any(ProducerRecord.class), any(Callback.class)))
- .thenAnswer(new Answer<Future<RecordMetadata>>() {
- @Override
- public Future<RecordMetadata> answer(InvocationOnMock invocation) throws Throwable {
- Callback callback = (Callback) invocation.getArguments()[1];
- callback.onCompletion(null, new Exception("Test error"));
- return null;
- }
- });
-
- // make sure the FlinkKafkaProducer instantiates our mock producer
- whenNew(KafkaProducer.class).withAnyArguments().thenReturn(kafkaProducerMock);
-
- // (1) producer that propagates errors
-
- FlinkKafkaProducer010<String> producerPropagating = new FlinkKafkaProducer010<>(
- "mock_topic", new SimpleStringSchema(), new Properties(), null);
-
- producerPropagating.setRuntimeContext(new MockRuntimeContext(17, 3));
- producerPropagating.open(new Configuration());
-
- try {
- producerPropagating.invoke("value");
- producerPropagating.invoke("value");
- fail("This should fail with an exception");
- }
- catch (Exception e) {
- assertNotNull(e.getCause());
- assertNotNull(e.getCause().getMessage());
- assertTrue(e.getCause().getMessage().contains("Test error"));
- }
-
- // (2) producer that only logs errors
-
- FlinkKafkaProducer010<String> producerLogging = new FlinkKafkaProducer010<>(
- "mock_topic", new SimpleStringSchema(), new Properties(), null);
- producerLogging.setLogFailuresOnly(true);
-
- producerLogging.setRuntimeContext(new MockRuntimeContext(17, 3));
- producerLogging.open(new Configuration());
-
- producerLogging.invoke("value");
- producerLogging.invoke("value");
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 45f0478..af6d254 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -28,6 +28,8 @@ import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.apache.commons.io.FileUtils;
import org.apache.curator.test.TestingServer;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
@@ -64,6 +66,9 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
private String brokerConnectionString = "";
private Properties standardProps;
private Properties additionalServerProperties;
+ private boolean secureMode = false;
+ // 6 seconds is default. Seems to be too small for travis. 30 seconds
+ private int zkTimeout = 30000;
public String getBrokerConnectionString() {
return brokerConnectionString;
@@ -75,6 +80,22 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
}
@Override
+ public Properties getSecureProperties() {
+ Properties prop = new Properties();
+ if(secureMode) {
+ prop.put("security.inter.broker.protocol", "SASL_PLAINTEXT");
+ prop.put("security.protocol", "SASL_PLAINTEXT");
+ prop.put("sasl.kerberos.service.name", "kafka");
+
+ //add special timeout for Travis
+ prop.setProperty("zookeeper.session.timeout.ms", String.valueOf(zkTimeout));
+ prop.setProperty("zookeeper.connection.timeout.ms", String.valueOf(zkTimeout));
+ prop.setProperty("metadata.fetch.timeout.ms","120000");
+ }
+ return prop;
+ }
+
+ @Override
public String getVersion() {
return "0.10";
}
@@ -90,10 +111,13 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
}
@Override
- public <T> FlinkKafkaProducerBase<T> getProducer(String topic, KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> partitioner) {
+ public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> partitioner) {
FlinkKafkaProducer010<T> prod = new FlinkKafkaProducer010<>(topic, serSchema, props, partitioner);
prod.setFlushOnCheckpoint(true);
- return prod;
+ return stream.addSink(prod);
+ /* FlinkKafkaProducer010.FlinkKafkaProducer010Configuration<T> sink = FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, topic, serSchema, props, partitioner);
+ sink.setFlushOnCheckpoint(true);
+ return sink; */
}
@Override
@@ -130,8 +154,21 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
}
@Override
- public void prepare(int numKafkaServers, Properties additionalServerProperties) {
+ public boolean isSecureRunSupported() {
+ return true;
+ }
+
+ @Override
+ public void prepare(int numKafkaServers, Properties additionalServerProperties, boolean secureMode) {
+ //increase the timeout since in Travis ZK connection takes long time for secure connection.
+ if(secureMode) {
+ //run only one kafka server to avoid multiple ZK connections from many instances - Travis timeout
+ numKafkaServers = 1;
+ zkTimeout = zkTimeout * 15;
+ }
+
this.additionalServerProperties = additionalServerProperties;
+ this.secureMode = secureMode;
File tempDir = new File(System.getProperty("java.io.tmpdir"));
tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString()));
@@ -151,9 +188,9 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
brokers = null;
try {
- LOG.info("Starting Zookeeper");
- zookeeper = new TestingServer(-1, tmpZkDir);
+ zookeeper = new TestingServer(- 1, tmpZkDir);
zookeeperConnectionString = zookeeper.getConnectString();
+ LOG.info("Starting Zookeeper with zookeeperConnectionString: {}", zookeeperConnectionString);
LOG.info("Starting KafkaServer");
brokers = new ArrayList<>(numKafkaServers);
@@ -161,8 +198,11 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
for (int i = 0; i < numKafkaServers; i++) {
brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i)));
- SocketServer socketServer = brokers.get(i).socketServer();
- brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.PLAINTEXT)) + ",";
+ if(secureMode) {
+ brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.SASL_PLAINTEXT)) + ",";
+ } else {
+ brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.PLAINTEXT)) + ",";
+ }
}
LOG.info("ZK and KafkaServer started.");
@@ -177,8 +217,8 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
standardProps.setProperty("bootstrap.servers", brokerConnectionString);
standardProps.setProperty("group.id", "flink-tests");
standardProps.setProperty("auto.commit.enable", "false");
- standardProps.setProperty("zookeeper.session.timeout.ms", "30000"); // 6 seconds is default. Seems to be too small for travis.
- standardProps.setProperty("zookeeper.connection.timeout.ms", "30000");
+ standardProps.setProperty("zookeeper.session.timeout.ms", String.valueOf(zkTimeout));
+ standardProps.setProperty("zookeeper.connection.timeout.ms", String.valueOf(zkTimeout));
standardProps.setProperty("auto.offset.reset", "earliest"); // read from the beginning. (earliest is kafka 0.10 value)
standardProps.setProperty("fetch.message.max.bytes", "256"); // make a lot of fetches (MESSAGES MUST BE SMALLER!)
}
@@ -244,7 +284,14 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
final long deadline = System.currentTimeMillis() + 30000;
do {
try {
- Thread.sleep(100);
+ if(secureMode) {
+ //increase wait time since in Travis ZK timeout occurs frequently
+ int wait = zkTimeout / 100;
+ LOG.info("waiting for {} msecs before the topic {} can be checked", wait, topic);
+ Thread.sleep(wait);
+ } else {
+ Thread.sleep(100);
+ }
} catch (InterruptedException e) {
// restore interrupted state
}
@@ -295,8 +342,8 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
kafkaProperties.put("replica.fetch.max.bytes", String.valueOf(50 * 1024 * 1024));
// for CI stability, increase zookeeper session timeout
- kafkaProperties.put("zookeeper.session.timeout.ms", "30000");
- kafkaProperties.put("zookeeper.connection.timeout.ms", "30000");
+ kafkaProperties.put("zookeeper.session.timeout.ms", zkTimeout);
+ kafkaProperties.put("zookeeper.connection.timeout.ms", zkTimeout);
if(additionalServerProperties != null) {
kafkaProperties.putAll(additionalServerProperties);
}
@@ -306,6 +353,15 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
for (int i = 1; i <= numTries; i++) {
int kafkaPort = NetUtils.getAvailablePort();
kafkaProperties.put("port", Integer.toString(kafkaPort));
+
+ //to support secure kafka cluster
+ if(secureMode) {
+ LOG.info("Adding Kafka secure configurations");
+ kafkaProperties.put("listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
+ kafkaProperties.put("advertised.listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
+ kafkaProperties.putAll(getSecureProperties());
+ }
+
KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties);
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
index 35e491a..1302348 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
@@ -376,7 +376,7 @@ class SimpleConsumerThread<T> extends Thread {
continue partitionsLoop;
}
- owner.emitRecord(value, currentPartition, offset);
+ owner.emitRecord(value, currentPartition, offset, Long.MIN_VALUE);
}
else {
// no longer running
http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index cbf3d06..a0d5002 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -31,6 +31,9 @@ import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingServer;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionLeader;
import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
@@ -101,10 +104,10 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
}
@Override
- public <T> FlinkKafkaProducerBase<T> getProducer(String topic, KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> partitioner) {
+ public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> partitioner) {
FlinkKafkaProducer08<T> prod = new FlinkKafkaProducer08<>(topic, serSchema, props, partitioner);
prod.setFlushOnCheckpoint(true);
- return prod;
+ return stream.addSink(prod);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
index 9708777..a97476a 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
@@ -81,11 +81,11 @@ public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> {
// ------------------------------------------------------------------------
/** User-supplied properties for Kafka **/
- private final Properties properties;
+ protected final Properties properties;
/** From Kafka's Javadoc: The time, in milliseconds, spent waiting in poll if data is not
* available. If 0, returns immediately with any records that are available now */
- private final long pollTimeout;
+ protected final long pollTimeout;
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
index eb3440a..2a3e39d 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
@@ -27,7 +27,7 @@ import java.util.Properties;
/**
- * Flink Sink to produce data into a Kafka topic. This producer is compatible with Kafka 0.8.
+ * Flink Sink to produce data into a Kafka topic. This producer is compatible with Kafka 0.9.
*
* Please note that this producer does not have any reliability guarantees.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
index aaec9dc..37e40fc 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
@@ -131,7 +131,7 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implem
// rather than running the main fetch loop directly here, we spawn a dedicated thread
// this makes sure that no interrupt() call upon canceling reaches the Kafka consumer code
- Thread runner = new Thread(this, "Kafka 0.9 Fetcher for " + runtimeContext.getTaskNameWithSubtasks());
+ Thread runner = new Thread(this, getFetcherName() + " for " + runtimeContext.getTaskNameWithSubtasks());
runner.setDaemon(true);
runner.start();
@@ -183,7 +183,8 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implem
// from here on, the consumer will be closed properly
try {
- consumer.assign(convertKafkaPartitions(subscribedPartitions()));
+ assignPartitionsToConsumer(consumer, convertKafkaPartitions(subscribedPartitions()));
+
if (useMetrics) {
final MetricGroup kafkaMetricGroup = runtimeContext.getMetricGroup().addGroup("KafkaConsumer");
@@ -250,7 +251,7 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implem
// emit the actual record. this also update offset state atomically
// and deals with timestamps and watermark generation
- emitRecord(value, partition, record.offset());
+ emitRecord(value, partition, record.offset(), record);
}
}
}
@@ -274,6 +275,21 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implem
}
}
+ // Kafka09Fetcher ignores the timestamp, Kafka010Fetcher is extracting the timestamp and passing it to the emitRecord() method.
+ protected void emitRecord(T record, KafkaTopicPartitionState<TopicPartition> partition, long offset, ConsumerRecord consumerRecord) throws Exception {
+ emitRecord(record, partition, offset, Long.MIN_VALUE);
+ }
+ /**
+ * Protected method to make the partition assignment pluggable, for different Kafka versions.
+ */
+ protected void assignPartitionsToConsumer(KafkaConsumer<byte[], byte[]> consumer, List<TopicPartition> topicPartitions) {
+ consumer.assign(topicPartitions);
+ }
+
+ protected String getFetcherName() {
+ return "Kafka 0.9 Fetcher";
+ }
+
// ------------------------------------------------------------------------
// Kafka 0.9 specific fetcher behavior
// ------------------------------------------------------------------------