You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/04/13 10:31:07 UTC
[09/14] flink git commit: [FLINK-3375] [kafka connector]
Rework/simplify Kafka Connector and have a WatermarkExtractor object per
partition
http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
new file mode 100644
index 0000000..6bad180
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.ExceptionProxy;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
+import org.apache.flink.streaming.connectors.kafka.internals.metrics.DefaultKafkaMetricAccumulator;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.util.SerializedValue;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * A fetcher that fetches data from Kafka brokers via the Kafka 0.9 consumer API.
+ *
+ * @param <T> The type of elements produced by the fetcher.
+ */
+public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implements Runnable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Kafka09Fetcher.class);
+
+ // ------------------------------------------------------------------------
+
+ /** The schema to convert between Kafka's byte messages, and Flink's objects */
+ private final KeyedDeserializationSchema<T> deserializer;
+
+ /** The subtask's runtime context */
+ private final RuntimeContext runtimeContext;
+
+ /** The configuration for the Kafka consumer */
+ private final Properties kafkaProperties;
+
+ /** The maximum number of milliseconds to wait for a fetch batch */
+ private final long pollTimeout;
+
+ /** Flag whether to register Kafka metrics as Flink accumulators */
+ private final boolean forwardKafkaMetrics;
+
+ /** Mutex to guard against concurrent access to the non-threadsafe Kafka consumer */
+ private final Object consumerLock = new Object();
+
+ /** Reference to the Kafka consumer, once it is created */
+ private volatile KafkaConsumer<byte[], byte[]> consumer;
+
+ /** Reference to the proxy, forwarding exceptions from the fetch thread to the main thread */
+ private volatile ExceptionProxy errorHandler;
+
+ /** Flag to mark the main work loop as alive */
+ private volatile boolean running = true;
+
+ // ------------------------------------------------------------------------
+
+ public Kafka09Fetcher(
+ SourceContext<T> sourceContext,
+ List<KafkaTopicPartition> assignedPartitions,
+ SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
+ SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
+ StreamingRuntimeContext runtimeContext,
+ KeyedDeserializationSchema<T> deserializer,
+ Properties kafkaProperties,
+ long pollTimeout,
+ boolean forwardKafkaMetrics) throws Exception
+ {
+ super(sourceContext, assignedPartitions, watermarksPeriodic, watermarksPunctuated, runtimeContext);
+
+ this.deserializer = deserializer;
+ this.runtimeContext = runtimeContext;
+ this.kafkaProperties = kafkaProperties;
+ this.pollTimeout = pollTimeout;
+ this.forwardKafkaMetrics = forwardKafkaMetrics;
+
+ // if checkpointing is enabled, we are not automatically committing to Kafka.
+ kafkaProperties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
+ Boolean.toString(!runtimeContext.isCheckpointingEnabled()));
+ }
+
+ // ------------------------------------------------------------------------
+ // Fetcher work methods
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void runFetchLoop() throws Exception {
+ this.errorHandler = new ExceptionProxy(Thread.currentThread());
+
+ // rather than running the main fetch loop directly here, we spawn a dedicated thread
+ // this makes sure that no interrupt() call upon canceling reaches the Kafka consumer code
+ Thread runner = new Thread(this, "Kafka 0.9 Fetcher for " + runtimeContext.getTaskNameWithSubtasks());
+ runner.setDaemon(true);
+ runner.start();
+
+ try {
+ runner.join();
+ } catch (InterruptedException e) {
+ // may be the result of a wake-up after an exception. we ignore this here and only
+ // restore the interruption state
+ Thread.currentThread().interrupt();
+ }
+
+ // make sure we propagate any exception that occurred in the concurrent fetch thread,
+ // before leaving this method
+ this.errorHandler.checkAndThrowException();
+ }
+
+ @Override
+ public void cancel() {
+ // flag the main thread to exit
+ running = false;
+
+ // NOTE:
+ // - We cannot interrupt the runner thread, because the Kafka consumer may
+ // deadlock when the thread is interrupted while in certain methods
+ // - We cannot call close() on the consumer, because it will actually throw
+ // an exception if a concurrent call is in progress
+
+ // make sure the consumer finds out faster that we are shutting down
+ if (consumer != null) {
+ consumer.wakeup();
+ }
+ }
+
+ @Override
+ public void run() {
+ // This method initializes the KafkaConsumer and guarantees it is torn down properly.
+ // This is important, because the consumer has multi-threading issues,
+ // including concurrent 'close()' calls.
+
+ final KafkaConsumer<byte[], byte[]> consumer;
+ try {
+ consumer = new KafkaConsumer<>(kafkaProperties);
+ }
+ catch (Throwable t) {
+ running = false;
+ errorHandler.reportError(t);
+ return;
+ }
+
+ // from here on, the consumer will be closed properly
+ try {
+ consumer.assign(convertKafkaPartitions(subscribedPartitions()));
+
+ // register Kafka metrics to Flink accumulators
+ if (forwardKafkaMetrics) {
+ Map<MetricName, ? extends Metric> metrics = consumer.metrics();
+ if (metrics == null) {
+ // MapR's Kafka implementation returns null here.
+ LOG.info("Consumer implementation does not support metrics");
+ } else {
+ // we have metrics, register them where possible
+ for (Map.Entry<MetricName, ? extends Metric> metric : metrics.entrySet()) {
+ String name = "KafkaConsumer-" + metric.getKey().name();
+ DefaultKafkaMetricAccumulator kafkaAccumulator =
+ DefaultKafkaMetricAccumulator.createFor(metric.getValue());
+
+ // best effort: we only add the accumulator if available.
+ if (kafkaAccumulator != null) {
+ runtimeContext.addAccumulator(name, kafkaAccumulator);
+ }
+ }
+ }
+ }
+
+ // seek the consumer to the initial offsets
+ for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) {
+ if (partition.isOffsetDefined()) {
+ consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1);
+ }
+ }
+
+ // from now on, external operations may call the consumer
+ this.consumer = consumer;
+
+ // main fetch loop
+ while (running) {
+ // get the next batch of records
+ final ConsumerRecords<byte[], byte[]> records;
+ synchronized (consumerLock) {
+ try {
+ records = consumer.poll(pollTimeout);
+ }
+ catch (WakeupException we) {
+ if (running) {
+ throw we;
+ } else {
+ continue;
+ }
+ }
+ }
+
+ // get the records for each topic partition
+ for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) {
+
+ List<ConsumerRecord<byte[], byte[]>> partitionRecords = records.records(partition.getKafkaPartitionHandle());
+
+ for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
+ T value = deserializer.deserialize(
+ record.key(), record.value(),
+ record.topic(), record.partition(), record.offset());
+
+ if (deserializer.isEndOfStream(value)) {
+ // end of stream signaled
+ running = false;
+ break;
+ }
+
+ // emit the actual record. this also update offset state atomically
+ // and deals with timestamps and watermark generation
+ emitRecord(value, partition, record.offset());
+ }
+ }
+ }
+ // end main fetch loop
+ }
+ catch (Throwable t) {
+ if (running) {
+ running = false;
+ errorHandler.reportError(t);
+ } else {
+ LOG.debug("Stopped ConsumerThread threw exception", t);
+ }
+ }
+ finally {
+ try {
+ synchronized (consumerLock) {
+ consumer.close();
+ }
+ } catch (Throwable t) {
+ LOG.warn("Error while closing Kafka 0.9 consumer", t);
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Kafka 0.9 specific fetcher behavior
+ // ------------------------------------------------------------------------
+
+ @Override
+ public TopicPartition createKafkaPartitionHandle(KafkaTopicPartition partition) {
+ return new TopicPartition(partition.getTopic(), partition.getPartition());
+ }
+
+ @Override
+ public void commitSpecificOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception {
+ KafkaTopicPartitionState<TopicPartition>[] partitions = subscribedPartitions();
+ Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>(partitions.length);
+
+ for (KafkaTopicPartitionState<TopicPartition> partition : partitions) {
+ Long offset = offsets.get(partition.getKafkaTopicPartition());
+ if (offset != null) {
+ offsetsToCommit.put(partition.getKafkaPartitionHandle(), new OffsetAndMetadata(offset, ""));
+ }
+ }
+
+ if (this.consumer != null) {
+ synchronized (consumerLock) {
+ this.consumer.commitSync(offsetsToCommit);
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ public static List<TopicPartition> convertKafkaPartitions(KafkaTopicPartitionState<TopicPartition>[] partitions) {
+ ArrayList<TopicPartition> result = new ArrayList<>(partitions.length);
+ for (KafkaTopicPartitionState<TopicPartition> p : partitions) {
+ result.add(p.getKafkaPartitionHandle());
+ }
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
index 82e1dce..afb0056 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
@@ -27,11 +27,6 @@ public class Kafka09ITCase extends KafkaConsumerTestBase {
// ------------------------------------------------------------------------
@Test(timeout = 60000)
- public void testCheckpointing() throws Exception {
- runCheckpointingTest();
- }
-
- @Test(timeout = 60000)
public void testFailOnNoBroker() throws Exception {
runFailOnNoBrokerTest();
}
@@ -41,15 +36,15 @@ public class Kafka09ITCase extends KafkaConsumerTestBase {
runSimpleConcurrentProducerConsumerTopology();
}
- @Test(timeout = 60000)
- public void testPunctuatedExplicitWMConsumer() throws Exception {
- runExplicitPunctuatedWMgeneratingConsumerTest(false);
- }
+// @Test(timeout = 60000)
+// public void testPunctuatedExplicitWMConsumer() throws Exception {
+// runExplicitPunctuatedWMgeneratingConsumerTest(false);
+// }
- @Test(timeout = 60000)
- public void testPunctuatedExplicitWMConsumerWithEmptyTopic() throws Exception {
- runExplicitPunctuatedWMgeneratingConsumerTest(true);
- }
+// @Test(timeout = 60000)
+// public void testPunctuatedExplicitWMConsumerWithEmptyTopic() throws Exception {
+// runExplicitPunctuatedWMgeneratingConsumerTest(true);
+// }
@Test(timeout = 60000)
public void testKeyValueSupport() throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
index a2c4f73..b80a231 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
@@ -22,19 +22,23 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.TestLogger;
+
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.PartitionInfo;
+
import org.junit.Test;
import org.junit.runner.RunWith;
+
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
-import java.util.Arrays;
+import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.Future;
@@ -60,7 +64,7 @@ public class KafkaProducerTest extends TestLogger {
// partition setup
when(kafkaProducerMock.partitionsFor(anyString())).thenReturn(
- Arrays.asList(new PartitionInfo("mock_topic", 42, null, null, null)));
+ Collections.singletonList(new PartitionInfo("mock_topic", 42, null, null, null)));
// failure when trying to send an element
when(kafkaProducerMock.send(any(ProducerRecord.class), any(Callback.class)))
http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention09ITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention09ITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention09ITCase.java
index 74b35af..c1b21b7 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention09ITCase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention09ITCase.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.kafka;
import org.junit.Test;
+@SuppressWarnings("serial")
public class KafkaShortRetention09ITCase extends KafkaShortRetentionTestBase {
@Test(timeout=60000)
http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties
index 6bdfb48..fbeb110 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties
@@ -25,5 +25,6 @@ log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
# suppress the irrelevant (wrong) warnings from the netty channel handler
log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
-
-
+log4j.logger.org.apache.zookeeper=OFF, testlogger
+log4j.logger.state.change.logger=OFF, testlogger
+log4j.logger.kafka=OFF, testlogger
http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index d9e813f..0ca8fd5 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -18,427 +18,291 @@
package org.apache.flink.streaming.connectors.kafka;
import org.apache.commons.collections.map.LinkedMap;
-import org.apache.flink.api.common.ExecutionConfig;
+
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
-import org.apache.flink.streaming.api.functions.TimestampAssigner;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaPartitionState;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.util.SerializedValue;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Properties;
-
-import static java.util.Objects.requireNonNull;
-import static org.apache.flink.streaming.connectors.kafka.util.KafkaUtils.checkArgument;
-
-public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T>
- implements CheckpointListener, CheckpointedAsynchronously<HashMap<KafkaTopicPartition, Long>>, ResultTypeQueryable<T>, Triggerable {
-
- // ------------------------------------------------------------------------
- private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumerBase.class);
+import static org.apache.flink.util.Preconditions.checkNotNull;
+/**
+ * Base class of all Flink Kafka Consumer data sources.
+ * This implements the common behavior across all Kafka versions.
+ *
+ * <p>The Kafka version specific behavior is defined mainly in the specific subclasses of the
+ * {@link AbstractFetcher}.
+ *
+ * @param <T> The type of records produced by this data source
+ */
+public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T> implements
+ CheckpointListener,
+ CheckpointedAsynchronously<HashMap<KafkaTopicPartition, Long>>,
+ ResultTypeQueryable<T>
+{
private static final long serialVersionUID = -6272159445203409112L;
- /** Magic number to define an unset offset. Negative offsets are not used by Kafka (invalid),
- * and we pick a number that is probably (hopefully) not used by Kafka as a magic number for anything else. */
- public static final long OFFSET_NOT_SET = -915623761776L;
-
+ protected static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumerBase.class);
+
/** The maximum number of pending non-committed checkpoints to track, to avoid memory leaks */
public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
-
- /** The schema to convert between Kafka#s byte messages, and Flink's objects */
+ // ------------------------------------------------------------------------
+ // configuration state, set on the client relevant for all subtasks
+ // ------------------------------------------------------------------------
+
+ /** The schema to convert between Kafka's byte messages, and Flink's objects */
protected final KeyedDeserializationSchema<T> deserializer;
- // ------ Runtime State -------
+ /** The set of topic partitions that the source will read */
+ protected List<KafkaTopicPartition> allSubscribedPartitions;
+
+ /** Optional timestamp extractor / watermark generator that will be run per Kafka partition,
+ * to exploit per-partition timestamp characteristics.
+ * The assigner is kept in serialized form, to deserialize it into multiple copies */
+ private SerializedValue<AssignerWithPeriodicWatermarks<T>> periodicWatermarkAssigner;
+
+ /** Optional timestamp extractor / watermark generator that will be run per Kafka partition,
+ * to exploit per-partition timestamp characteristics.
+ * The assigner is kept in serialized form, to deserialize it into multiple copies */
+ private SerializedValue<AssignerWithPunctuatedWatermarks<T>> punctuatedWatermarkAssigner;
+ // ------------------------------------------------------------------------
+ // runtime state (used individually by each parallel subtask)
+ // ------------------------------------------------------------------------
+
/** Data for pending but uncommitted checkpoints */
- protected final LinkedMap pendingCheckpoints = new LinkedMap();
-
- /**
- * Information about the partitions being read by the local consumer. This contains:
- * offsets of the last returned elements, and if a timestamp assigner is used, it
- * also contains the maximum seen timestamp in the partition and if the partition
- * still receives elements or it is inactive.
- */
- protected transient HashMap<KafkaTopicPartition, KafkaPartitionState> partitionState;
+ private final LinkedMap pendingCheckpoints = new LinkedMap();
+ /** The fetcher implements the connections to the Kafka brokers */
+ private transient volatile AbstractFetcher<T, ?> kafkaFetcher;
+
/** The offsets to restore to, if the consumer restores state from a checkpoint */
- protected transient HashMap<KafkaTopicPartition, Long> restoreToOffset;
-
+ private transient volatile HashMap<KafkaTopicPartition, Long> restoreToOffset;
+
/** Flag indicating whether the consumer is still running **/
- protected volatile boolean running = true;
+ private volatile boolean running = true;
// ------------------------------------------------------------------------
- // WATERMARK EMISSION
- // ------------------------------------------------------------------------
/**
- * The user-specified methods to extract the timestamps from the records in Kafka, and
- * to decide when to emit watermarks.
- */
- private AssignerWithPunctuatedWatermarks<T> punctuatedWatermarkAssigner;
-
- /**
- * The user-specified methods to extract the timestamps from the records in Kafka, and
- * to decide when to emit watermarks.
- */
- private AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner;
-
- private StreamingRuntimeContext runtime = null;
-
- private SourceContext<T> srcContext = null;
-
- /**
- * The interval between consecutive periodic watermark emissions,
- * as configured via the {@link ExecutionConfig#getAutoWatermarkInterval()}.
- */
- private long watermarkInterval = -1;
-
- /** The last emitted watermark. */
- private long lastEmittedWatermark = Long.MIN_VALUE;
-
- // ------------------------------------------------------------------------
-
- /**
- * Creates a new Flink Kafka Consumer, using the given type of fetcher and offset handler.
- *
- * <p>To determine which kink of fetcher and offset handler to use, please refer to the docs
- * at the beginning of this class.</p>
+ * Base constructor.
*
* @param deserializer
* The deserializer to turn raw byte messages into Java/Scala objects.
- * @param props
- * The properties that are used to configure both the fetcher and the offset handler.
*/
- public FlinkKafkaConsumerBase(KeyedDeserializationSchema<T> deserializer, Properties props) {
- this.deserializer = requireNonNull(deserializer, "valueDeserializer");
+ public FlinkKafkaConsumerBase(KeyedDeserializationSchema<T> deserializer) {
+ this.deserializer = checkNotNull(deserializer, "valueDeserializer");
}
/**
- * Specifies an {@link AssignerWithPunctuatedWatermarks} to emit watermarks in a punctuated manner. Bare in mind
- * that the source can either have an {@link AssignerWithPunctuatedWatermarks} or an
- * {@link AssignerWithPeriodicWatermarks}, not both.
+ * This method must be called from the subclasses, to set the list of all subscribed partitions
+ * that this consumer will fetch from (across all subtasks).
+ *
+ * @param allSubscribedPartitions The list of all partitions that all subtasks together should fetch from.
*/
- public FlinkKafkaConsumerBase<T> setPunctuatedWatermarkEmitter(AssignerWithPunctuatedWatermarks<T> assigner) {
- checkEmitterDuringInit();
- this.punctuatedWatermarkAssigner = assigner;
- return this;
+ protected void setSubscribedPartitions(List<KafkaTopicPartition> allSubscribedPartitions) {
+ checkNotNull(allSubscribedPartitions);
+ this.allSubscribedPartitions = Collections.unmodifiableList(allSubscribedPartitions);
}
+ // ------------------------------------------------------------------------
+ // Configuration
+ // ------------------------------------------------------------------------
+
/**
- * Specifies an {@link AssignerWithPeriodicWatermarks} to emit watermarks periodically. Bare in mind that the
- * source can either have an {@link AssignerWithPunctuatedWatermarks} or an
- * {@link AssignerWithPeriodicWatermarks}, not both.
+ * Specifies an {@link AssignerWithPunctuatedWatermarks} to emit watermarks in a punctuated manner.
+ * The watermark extractor will run per Kafka partition, watermarks will be merged across partitions
+ * in the same way as in the Flink runtime, when streams are merged.
+ *
+ * <p>When a subtask of a FlinkKafkaConsumer source reads multiple Kafka partitions,
+ * the streams from the partitions are unioned in a "first come first serve" fashion. Per-partition
+ * characteristics are usually lost that way. For example, if the timestamps are strictly ascending
+ * per Kafka partition, they will not be strictly ascending in the resulting Flink DataStream, if the
+ * parallel source subtask reads more that one partition.
+ *
+ * <p>Running timestamp extractors / watermark generators directly inside the Kafka source, per Kafka
+ * partition, allows users to let them exploit the per-partition characteristics.
+ *
+ * <p>Note: One can use either an {@link AssignerWithPunctuatedWatermarks} or an
+ * {@link AssignerWithPeriodicWatermarks}, not both at the same time.
+ *
+ * @param assigner The timestamp assigner / watermark generator to use.
+ * @return The consumer object, to allow function chaining.
*/
- public FlinkKafkaConsumerBase<T> setPeriodicWatermarkEmitter(AssignerWithPeriodicWatermarks<T> assigner) {
- checkEmitterDuringInit();
- this.periodicWatermarkAssigner = assigner;
- return this;
- }
-
- /**
- * Processes the element after having been read from Kafka and deserialized, and updates the
- * last read offset for the specifies partition. These two actions should be performed in
- * an atomic way in order to guarantee exactly once semantics.
- * @param sourceContext
- * The context the task operates in.
- * @param partDescriptor
- * A descriptor containing the topic and the id of the partition.
- * @param value
- * The element to process.
- * @param offset
- * The offset of the element in the partition.
- * */
- public void processElement(SourceContext<T> sourceContext, KafkaTopicPartition partDescriptor, T value, long offset) {
- if (punctuatedWatermarkAssigner == null && periodicWatermarkAssigner == null) {
- // the case where no watermark emitter is specified.
- sourceContext.collect(value);
- } else {
-
- if (srcContext == null) {
- srcContext = sourceContext;
- }
-
- long extractedTimestamp = extractTimestampAndEmitElement(partDescriptor, value);
-
- // depending on the specified watermark emitter, either send a punctuated watermark,
- // or set the timer for the first periodic watermark. In the periodic case, we set the timer
- // only for the first watermark, as it is the trigger() that will set the subsequent ones.
-
- if (punctuatedWatermarkAssigner != null) {
- final Watermark nextWatermark = punctuatedWatermarkAssigner
- .checkAndGetNextWatermark(value, extractedTimestamp);
- if (nextWatermark != null) {
- emitWatermarkIfMarkingProgress(sourceContext);
- }
- } else if(periodicWatermarkAssigner != null && runtime == null) {
- runtime = (StreamingRuntimeContext) getRuntimeContext();
- watermarkInterval = runtime.getExecutionConfig().getAutoWatermarkInterval();
- if (watermarkInterval > 0) {
- runtime.registerTimer(System.currentTimeMillis() + watermarkInterval, this);
- }
- }
+ public FlinkKafkaConsumerBase<T> setPunctuatedWatermarkEmitter(AssignerWithPunctuatedWatermarks<T> assigner) {
+ checkNotNull(assigner);
+
+ if (this.periodicWatermarkAssigner != null) {
+ throw new IllegalStateException("A periodic watermark emitter has already been set.");
+ }
+ try {
+ this.punctuatedWatermarkAssigner = new SerializedValue<>(assigner);
+ return this;
+ } catch (Exception e) {
+ throw new IllegalArgumentException("The given assigner is not serializable", e);
}
- updateOffsetForPartition(partDescriptor, offset);
}
/**
- * Extract the timestamp from the element based on the user-specified extractor,
- * emit the element with the new timestamp, and update the partition monitoring info (if necessary).
- * In more detail, upon reception of an element with a timestamp greater than the greatest timestamp
- * seen so far in that partition, this method updates the maximum timestamp seen for that partition,
- * and marks the partition as {@code active}, i.e. it still receives fresh data.
- * @param partDescriptor the partition the new element belongs to.
- * @param value the element to be forwarded.
- * @return the timestamp of the new element.
+ * Specifies an {@link AssignerWithPunctuatedWatermarks} to emit watermarks in a punctuated manner.
+ * The watermark extractor will run per Kafka partition, watermarks will be merged across partitions
+ * in the same way as in the Flink runtime, when streams are merged.
+ *
+ * <p>When a subtask of a FlinkKafkaConsumer source reads multiple Kafka partitions,
+ * the streams from the partitions are unioned in a "first come first serve" fashion. Per-partition
+ * characteristics are usually lost that way. For example, if the timestamps are strictly ascending
+ * per Kafka partition, they will not be strictly ascending in the resulting Flink DataStream, if the
+ * parallel source subtask reads more that one partition.
+ *
+ * <p>Running timestamp extractors / watermark generators directly inside the Kafka source, per Kafka
+ * partition, allows users to let them exploit the per-partition characteristics.
+ *
+ * <p>Note: One can use either an {@link AssignerWithPunctuatedWatermarks} or an
+ * {@link AssignerWithPeriodicWatermarks}, not both at the same time.
+ *
+ * @param assigner The timestamp assigner / watermark generator to use.
+ * @return The consumer object, to allow function chaining.
*/
- private long extractTimestampAndEmitElement(KafkaTopicPartition partDescriptor, T value) {
- long extractedTimestamp = getTimestampAssigner().extractTimestamp(value, Long.MIN_VALUE);
- srcContext.collectWithTimestamp(value, extractedTimestamp);
- updateMaximumTimestampForPartition(partDescriptor, extractedTimestamp);
- return extractedTimestamp;
- }
-
- /**
- * Upon reception of an element with a timestamp greater than the greatest timestamp seen so far in the partition,
- * this method updates the maximum timestamp seen for that partition to {@code timestamp}, and marks the partition
- * as {@code active}, i.e. it still receives fresh data. If the partition is not known to the system, then a new
- * {@link KafkaPartitionState} is created and is associated to the new partition for future monitoring.
- * @param partDescriptor
- * A descriptor containing the topic and the id of the partition.
- * @param timestamp
- * The timestamp to set the minimum to, if smaller than the already existing one.
- * @return {@code true} if the minimum was updated successfully to {@code timestamp}, {@code false}
- * if the previous value is smaller than the provided timestamp
- * */
- private boolean updateMaximumTimestampForPartition(KafkaTopicPartition partDescriptor, long timestamp) {
- KafkaPartitionState info = getOrInitializeInfo(partDescriptor);
-
- if(timestamp > info.getMaxTimestamp()) {
-
- // the flag is set to false as soon as the current partition's max timestamp is sent as a watermark.
- // if then, and for that partition, only late elements arrive, then the max timestamp will stay the
- // same, and it will keep the overall system from progressing.
- // To avoid this, we only mark a partition as active on non-late elements.
-
- info.setActive(true);
- info.setMaxTimestamp(timestamp);
- return true;
+ public FlinkKafkaConsumerBase<T> setPeriodicWatermarkEmitter(AssignerWithPeriodicWatermarks<T> assigner) {
+ checkNotNull(assigner);
+
+ if (this.punctuatedWatermarkAssigner != null) {
+ throw new IllegalStateException("A punctuated watermark emitter has already been set.");
+ }
+ try {
+ this.periodicWatermarkAssigner = new SerializedValue<>(assigner);
+ return this;
+ } catch (Exception e) {
+ throw new IllegalArgumentException("The given assigner is not serializable", e);
}
- return false;
}
- /**
- * Updates the last read offset for the partition specified by the {@code partDescriptor} to {@code offset}.
- * If it is the first time we see the partition, then a new {@link KafkaPartitionState} is created to monitor
- * this specific partition.
- * @param partDescriptor the partition whose info to update.
- * @param offset the last read offset of the partition.
- */
- public void updateOffsetForPartition(KafkaTopicPartition partDescriptor, long offset) {
- KafkaPartitionState info = getOrInitializeInfo(partDescriptor);
- info.setOffset(offset);
- }
+ // ------------------------------------------------------------------------
+ // Work methods
+ // ------------------------------------------------------------------------
@Override
- public void trigger(long timestamp) throws Exception {
- if(this.srcContext == null) {
- // if the trigger is called before any elements, then we
- // just set the next timer to fire when it should and we
- // ignore the triggering as this would produce no results.
- setNextWatermarkTimer();
- return;
+ public void run(SourceContext<T> sourceContext) throws Exception {
+ if (allSubscribedPartitions == null) {
+ throw new Exception("The partitions were not set for the consumer");
}
-
- // this is valid because this method is only called when watermarks
- // are set to be emitted periodically.
- final Watermark nextWatermark = periodicWatermarkAssigner.getCurrentWatermark();
- if(nextWatermark != null) {
- emitWatermarkIfMarkingProgress(srcContext);
- }
- setNextWatermarkTimer();
- }
-
- /**
- * Emits a new watermark, with timestamp equal to the minimum across all the maximum timestamps
- * seen per local partition (across all topics). The new watermark is emitted if and only if
- * it signals progress in event-time, i.e. if its timestamp is greater than the timestamp of
- * the last emitted watermark. In addition, this method marks as inactive the partition whose
- * timestamp was emitted as watermark, i.e. the one with the minimum across the maximum timestamps
- * of the local partitions. This is done to avoid not making progress because
- * a partition stopped receiving data. The partition is going to be marked as {@code active}
- * as soon as the <i>next non-late</i> element arrives.
- *
- * @return {@code true} if the Watermark was successfully emitted, {@code false} otherwise.
- */
- private boolean emitWatermarkIfMarkingProgress(SourceFunction.SourceContext<T> sourceContext) {
- Tuple2<KafkaTopicPartition, Long> globalMinTs = getMinTimestampAcrossAllTopics();
- if(globalMinTs.f0 != null ) {
- synchronized (sourceContext.getCheckpointLock()) {
- long minTs = globalMinTs.f1;
- if(minTs > lastEmittedWatermark) {
- lastEmittedWatermark = minTs;
- Watermark toEmit = new Watermark(minTs);
- sourceContext.emitWatermark(toEmit);
- return true;
- }
+
+ // figure out which partitions this subtask should process
+ final List<KafkaTopicPartition> thisSubtaskPartitions = assignPartitions(allSubscribedPartitions,
+ getRuntimeContext().getNumberOfParallelSubtasks(), getRuntimeContext().getIndexOfThisSubtask());
+
+ // we need only do work, if we actually have partitions assigned
+ if (!thisSubtaskPartitions.isEmpty()) {
+
+ // (1) create the fetcher that will communicate with the Kafka brokers
+ final AbstractFetcher<T, ?> fetcher = createFetcher(
+ sourceContext, thisSubtaskPartitions,
+ periodicWatermarkAssigner, punctuatedWatermarkAssigner,
+ (StreamingRuntimeContext) getRuntimeContext());
+
+ // (2) set the fetcher to the restored checkpoint offsets
+ if (restoreToOffset != null) {
+ fetcher.restoreOffsets(restoreToOffset);
}
- }
- return false;
- }
- /**
- * Kafka sources with timestamp extractors are expected to keep the maximum timestamp seen per
- * partition they are reading from. This is to mark the per-partition event-time progress.
- *
- * This method iterates this list, and returns the minimum timestamp across these per-partition
- * max timestamps, and across all topics. In addition to this information, it also returns the topic and
- * the partition within the topic the timestamp belongs to.
- */
- private Tuple2<KafkaTopicPartition, Long> getMinTimestampAcrossAllTopics() {
- Tuple2<KafkaTopicPartition, Long> minTimestamp = new Tuple2<>(null, Long.MAX_VALUE);
- for(Map.Entry<KafkaTopicPartition, KafkaPartitionState> entries: partitionState.entrySet()) {
- KafkaTopicPartition part = entries.getKey();
- KafkaPartitionState info = entries.getValue();
-
- if(partitionIsActive(part) && info.getMaxTimestamp() < minTimestamp.f1) {
- minTimestamp.f0 = part;
- minTimestamp.f1 = info.getMaxTimestamp();
+ // publish the reference, for snapshot-, commit-, and cancel calls
+ // IMPORTANT: We can only do that now, because only now will calls to
+ // the fetchers 'snapshotCurrentState()' method return at least
+ // the restored offsets
+ this.kafkaFetcher = fetcher;
+ if (!running) {
+ return;
}
+
+ // (3) run the fetcher' main work method
+ fetcher.runFetchLoop();
}
-
- if(minTimestamp.f0 != null) {
- // it means that we have a winner and we have to set its flag to
- // inactive, until its next non-late element.
- KafkaTopicPartition partitionDescriptor = minTimestamp.f0;
- setActiveFlagForPartition(partitionDescriptor, false);
- }
-
- return minTimestamp;
- }
-
- /**
- * Sets the {@code active} flag for a given partition of a topic to {@code isActive}.
- * This flag signals if the partition is still receiving data and it is used to avoid the case
- * where a partition stops receiving data, so its max seen timestamp does not advance, and it
- * holds back the progress of the watermark for all partitions. Note that if the partition is
- * not known to the system, then a new {@link KafkaPartitionState} is created and is associated
- * to the new partition for future monitoring.
- *
- * @param partDescriptor
- * A descriptor containing the topic and the id of the partition.
- * @param isActive
- * The value {@code true} or {@code false} to set the flag to.
- */
- private void setActiveFlagForPartition(KafkaTopicPartition partDescriptor, boolean isActive) {
- KafkaPartitionState info = getOrInitializeInfo(partDescriptor);
- info.setActive(isActive);
- }
-
- /**
- * Gets the statistics for a given partition specified by the {@code partition} argument.
- * If it is the first time we see this partition, a new {@link KafkaPartitionState} data structure
- * is initialized to monitor it from now on. This method never throws a {@link NullPointerException}.
- * @param partition the partition to be fetched.
- * @return the gathered statistics for that partition.
- * */
- private KafkaPartitionState getOrInitializeInfo(KafkaTopicPartition partition) {
- KafkaPartitionState info = partitionState.get(partition);
- if(info == null) {
- info = new KafkaPartitionState(partition.getPartition(), FlinkKafkaConsumerBase.OFFSET_NOT_SET);
- partitionState.put(partition, info);
+ else {
+ // this source never completes, so emit a Long.MAX_VALUE watermark
+ // to not block watermark forwarding
+ sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE));
+
+ // wait until this is canceled
+ final Object waitLock = new Object();
+ while (running) {
+ try {
+ //noinspection SynchronizationOnLocalVariableOrMethodParameter
+ synchronized (waitLock) {
+ waitLock.wait();
+ }
+ }
+ catch (InterruptedException e) {
+ if (!running) {
+ // restore the interrupted state, and fall through the loop
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
}
- return info;
}
- /**
- * Checks if a partition of a topic is still active, i.e. if it still receives data.
- * @param partDescriptor
- * A descriptor containing the topic and the id of the partition.
- * */
- private boolean partitionIsActive(KafkaTopicPartition partDescriptor) {
- KafkaPartitionState info = partitionState.get(partDescriptor);
- if(info == null) {
- throw new RuntimeException("Unknown Partition: Topic=" + partDescriptor.getTopic() +
- " Partition=" + partDescriptor.getPartition());
+ @Override
+ public void cancel() {
+ // set ourselves as not running
+ running = false;
+
+ // abort the fetcher, if there is one
+ if (kafkaFetcher != null) {
+ kafkaFetcher.cancel();
}
- return info.isActive();
- }
- private TimestampAssigner<T> getTimestampAssigner() {
- checkEmitterStateAfterInit();
- return periodicWatermarkAssigner != null ? periodicWatermarkAssigner : punctuatedWatermarkAssigner;
- }
-
- private void setNextWatermarkTimer() {
- long timeToNextWatermark = System.currentTimeMillis() + watermarkInterval;
- runtime.registerTimer(timeToNextWatermark, this);
- }
-
- private void checkEmitterDuringInit() {
- if(periodicWatermarkAssigner != null) {
- throw new RuntimeException("A periodic watermark emitter has already been provided.");
- } else if(punctuatedWatermarkAssigner != null) {
- throw new RuntimeException("A punctuated watermark emitter has already been provided.");
- }
+ // there will be an interrupt() call to the main thread anyways
}
- private void checkEmitterStateAfterInit() {
- if(periodicWatermarkAssigner == null && punctuatedWatermarkAssigner == null) {
- throw new RuntimeException("The timestamp assigner has not been initialized.");
- } else if(periodicWatermarkAssigner != null && punctuatedWatermarkAssigner != null) {
- throw new RuntimeException("The source can either have an assigner with punctuated " +
- "watermarks or one with periodic watermarks, not both.");
+ @Override
+ public void close() throws Exception {
+ // pretty much the same logic as cancelling
+ try {
+ cancel();
+ } finally {
+ super.close();
}
}
-
+
// ------------------------------------------------------------------------
// Checkpoint and restore
// ------------------------------------------------------------------------
-
- HashMap<KafkaTopicPartition, KafkaPartitionState> restoreInfoFromCheckpoint() {
- HashMap<KafkaTopicPartition, KafkaPartitionState> partInfo = new HashMap<>(restoreToOffset.size());
- for(Map.Entry<KafkaTopicPartition, Long> offsets: restoreToOffset.entrySet()) {
- KafkaTopicPartition key = offsets.getKey();
- partInfo.put(key, new KafkaPartitionState(key.getPartition(), offsets.getValue()));
- }
- return partInfo;
- }
-
+
@Override
public HashMap<KafkaTopicPartition, Long> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
- if (partitionState == null) {
- LOG.debug("snapshotState() requested on not yet opened source; returning null.");
- return null;
- }
if (!running) {
LOG.debug("snapshotState() called on closed source");
return null;
}
-
- HashMap<KafkaTopicPartition, Long> currentOffsets = new HashMap<>();
- for (Map.Entry<KafkaTopicPartition, KafkaPartitionState> entry: partitionState.entrySet()) {
- currentOffsets.put(entry.getKey(), entry.getValue().getOffset());
+
+ final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
+ if (fetcher == null) {
+ // the fetcher has not yet been initialized, which means we need to return the
+ // originally restored offsets
+ return restoreToOffset;
}
+ HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState();
+
if (LOG.isDebugEnabled()) {
LOG.debug("Snapshotting state. Offsets: {}, checkpoint id: {}, timestamp: {}",
KafkaTopicPartition.toString(currentOffsets), checkpointId, checkpointTimestamp);
@@ -447,7 +311,8 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
// the map cannot be asynchronously updated, because only one checkpoint call can happen
// on this function at a time: either snapshotState() or notifyCheckpointComplete()
pendingCheckpoints.put(checkpointId, currentOffsets);
-
+
+ // truncate the map, to prevent infinite growth
while (pendingCheckpoints.size() > MAX_NUM_PENDING_CHECKPOINTS) {
pendingCheckpoints.remove(0);
}
@@ -457,51 +322,49 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
@Override
public void restoreState(HashMap<KafkaTopicPartition, Long> restoredOffsets) {
- LOG.info("Setting restore state in Kafka");
+ LOG.info("Setting restore state in the FlinkKafkaConsumer");
restoreToOffset = restoredOffsets;
}
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
- if (partitionState == null) {
- LOG.debug("notifyCheckpointComplete() called on uninitialized source");
- return;
- }
if (!running) {
LOG.debug("notifyCheckpointComplete() called on closed source");
return;
}
+
+ final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
+ if (fetcher == null) {
+ LOG.debug("notifyCheckpointComplete() called on uninitialized source");
+ return;
+ }
// only one commit operation must be in progress
if (LOG.isDebugEnabled()) {
- LOG.debug("Committing offsets externally for checkpoint {}", checkpointId);
+ LOG.debug("Committing offsets to Kafka/ZooKeeper for checkpoint " + checkpointId);
}
try {
- HashMap<KafkaTopicPartition, Long> checkpointOffsets;
-
- // the map may be asynchronously updates when snapshotting state, so we synchronize
- synchronized (pendingCheckpoints) {
- final int posInMap = pendingCheckpoints.indexOf(checkpointId);
- if (posInMap == -1) {
- LOG.warn("Received confirmation for unknown checkpoint id {}", checkpointId);
- return;
- }
+ final int posInMap = pendingCheckpoints.indexOf(checkpointId);
+ if (posInMap == -1) {
+ LOG.warn("Received confirmation for unknown checkpoint id {}", checkpointId);
+ return;
+ }
- //noinspection unchecked
- checkpointOffsets = (HashMap<KafkaTopicPartition, Long>) pendingCheckpoints.remove(posInMap);
+ @SuppressWarnings("unchecked")
+ HashMap<KafkaTopicPartition, Long> checkpointOffsets =
+ (HashMap<KafkaTopicPartition, Long>) pendingCheckpoints.remove(posInMap);
-
- // remove older checkpoints in map
- for (int i = 0; i < posInMap; i++) {
- pendingCheckpoints.remove(0);
- }
+ // remove older checkpoints in map
+ for (int i = 0; i < posInMap; i++) {
+ pendingCheckpoints.remove(0);
}
+
if (checkpointOffsets == null || checkpointOffsets.size() == 0) {
LOG.debug("Checkpoint state was empty.");
return;
}
- commitOffsets(checkpointOffsets);
+ fetcher.commitSpecificOffsetsToKafka(checkpointOffsets);
}
catch (Exception e) {
if (running) {
@@ -511,33 +374,77 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
}
}
- protected abstract void commitOffsets(HashMap<KafkaTopicPartition, Long> checkpointOffsets) throws Exception;
-
-
+ // ------------------------------------------------------------------------
+ // Kafka Consumer specific methods
+ // ------------------------------------------------------------------------
+
+ /**
+ * Creates the fetcher that connect to the Kafka brokers, pulls data, deserialized the
+ * data, and emits it into the data streams.
+ *
+ * @param sourceContext The source context to emit data to.
+ * @param thisSubtaskPartitions The set of partitions that this subtask should handle.
+ * @param watermarksPeriodic Optional, a serialized timestamp extractor / periodic watermark generator.
+ * @param watermarksPunctuated Optional, a serialized timestamp extractor / punctuated watermark generator.
+ * @param runtimeContext The task's runtime context.
+ *
+ * @return The instantiated fetcher
+ *
+ * @throws Exception The method should forward exceptions
+ */
+ protected abstract AbstractFetcher<T, ?> createFetcher(
+ SourceContext<T> sourceContext,
+ List<KafkaTopicPartition> thisSubtaskPartitions,
+ SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
+ SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
+ StreamingRuntimeContext runtimeContext) throws Exception;
+
+ // ------------------------------------------------------------------------
+ // ResultTypeQueryable methods
+ // ------------------------------------------------------------------------
+
@Override
public TypeInformation<T> getProducedType() {
return deserializer.getProducedType();
}
- protected static <T> List<T> assignPartitions(List<T> partitions, int numConsumers, int consumerIndex) {
- checkArgument(numConsumers > 0);
- checkArgument(consumerIndex < numConsumers);
-
- List<T> partitionsToSub = new ArrayList<>();
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
- for (int i = 0; i < partitions.size(); i++) {
+ /**
+ * Selects which of the given partitions should be handled by a specific consumer,
+ * given a certain number of consumers.
+ *
+ * @param allPartitions The partitions to select from
+ * @param numConsumers The number of consumers
+ * @param consumerIndex The index of the specific consumer
+ *
+ * @return The sublist of partitions to be handled by that consumer.
+ */
+ protected static List<KafkaTopicPartition> assignPartitions(
+ List<KafkaTopicPartition> allPartitions,
+ int numConsumers, int consumerIndex)
+ {
+ final List<KafkaTopicPartition> thisSubtaskPartitions = new ArrayList<>(
+ allPartitions.size() / numConsumers + 1);
+
+ for (int i = 0; i < allPartitions.size(); i++) {
if (i % numConsumers == consumerIndex) {
- partitionsToSub.add(partitions.get(i));
+ thisSubtaskPartitions.add(allPartitions.get(i));
}
}
- return partitionsToSub;
+
+ return thisSubtaskPartitions;
}
-
+
/**
- * Method to log partition information.
+ * Logs the partition information in INFO level.
+ *
+ * @param logger The logger to log to.
* @param partitionInfos List of subscribed partitions
*/
- public static void logPartitionInfo(List<KafkaTopicPartition> partitionInfos) {
+ protected static void logPartitionInfo(Logger logger, List<KafkaTopicPartition> partitionInfos) {
Map<String, Integer> countPerTopic = new HashMap<>();
for (KafkaTopicPartition partition : partitionInfos) {
Integer count = countPerTopic.get(partition.getTopic());
@@ -548,12 +455,13 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
}
countPerTopic.put(partition.getTopic(), count);
}
- StringBuilder sb = new StringBuilder();
+ StringBuilder sb = new StringBuilder(
+ "Consumer is going to read the following topics (with number of partitions): ");
+
for (Map.Entry<String, Integer> e : countPerTopic.entrySet()) {
sb.append(e.getKey()).append(" (").append(e.getValue()).append("), ");
}
- LOG.info("Consumer is going to read the following topics (with number of partitions): {}", sb.toString());
+
+ logger.info(sb.toString());
}
-
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
new file mode 100644
index 0000000..594aa66
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -0,0 +1,439 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.operators.Triggerable;
+import org.apache.flink.util.SerializedValue;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base class for all fetchers, which implement the connections to Kafka brokers and
+ * pull records from Kafka partitions.
+ *
+ * <p>This fetcher base class implements the logic around emitting records and tracking offsets,
+ * as well as around the optional timestamp assignment and watermark generation.
+ *
+ * @param <T> The type of elements deserialized from Kafka's byte records, and emitted into
+ * the Flink data streams.
+ * @param <KPH> The type of topic/partition identifier used by Kafka in the specific version.
+ */
+public abstract class AbstractFetcher<T, KPH> {
+
+ private static final int NO_TIMESTAMPS_WATERMARKS = 0;
+ private static final int PERIODIC_WATERMARKS = 1;
+ private static final int PUNCTUATED_WATERMARKS = 2;
+
+ // ------------------------------------------------------------------------
+
+ /** The source context to emit records and watermarks to */
+ private final SourceContext<T> sourceContext;
+
+ /** The lock that guarantees that record emission and state updates are atomic,
+ * from the view of taking a checkpoint */
+ private final Object checkpointLock;
+
+ /** All partitions (and their state) that this fetcher is subscribed to */
+ private final KafkaTopicPartitionState<KPH>[] allPartitions;
+
+ /** The mode describing whether the fetcher also generates timestamps and watermarks */
+ private final int timestampWatermarkMode;
+
+ /** Only relevant for punctuated watermarks: The current cross partition watermark */
+ private volatile long maxWatermarkSoFar = Long.MIN_VALUE;
+
+ // ------------------------------------------------------------------------
+
+ protected AbstractFetcher(
+ SourceContext<T> sourceContext,
+ List<KafkaTopicPartition> assignedPartitions,
+ SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
+ SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
+ StreamingRuntimeContext runtimeContext) throws Exception
+ {
+ this.sourceContext = checkNotNull(sourceContext);
+ this.checkpointLock = sourceContext.getCheckpointLock();
+
+ // figure out what we watermark mode we will be using
+
+ if (watermarksPeriodic == null) {
+ if (watermarksPunctuated == null) {
+ // simple case, no watermarks involved
+ timestampWatermarkMode = NO_TIMESTAMPS_WATERMARKS;
+ } else {
+ timestampWatermarkMode = PUNCTUATED_WATERMARKS;
+ }
+ } else {
+ if (watermarksPunctuated == null) {
+ timestampWatermarkMode = PERIODIC_WATERMARKS;
+ } else {
+ throw new IllegalArgumentException("Cannot have both periodic and punctuated watermarks");
+ }
+ }
+
+ // create our partition state according to the timestamp/watermark mode
+ this.allPartitions = initializePartitions(
+ assignedPartitions,
+ timestampWatermarkMode,
+ watermarksPeriodic, watermarksPunctuated,
+ runtimeContext.getUserCodeClassLoader());
+
+ // if we have periodic watermarks, kick off the interval scheduler
+ if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
+ KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] parts =
+ (KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[]) allPartitions;
+
+ PeriodicWatermarkEmitter periodicEmitter =
+ new PeriodicWatermarkEmitter(parts, sourceContext, runtimeContext);
+ periodicEmitter.start();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Properties
+ // ------------------------------------------------------------------------
+
+ /**
+ * Gets all partitions (with partition state) that this fetcher is subscribed to.
+ *
+ * @return All subscribed partitions.
+ */
+ protected final KafkaTopicPartitionState<KPH>[] subscribedPartitions() {
+ return allPartitions;
+ }
+
+ // ------------------------------------------------------------------------
+ // Core fetcher work methods
+ // ------------------------------------------------------------------------
+
+ public abstract void runFetchLoop() throws Exception;
+
+ public abstract void cancel();
+
+ // ------------------------------------------------------------------------
+ // Kafka version specifics
+ // ------------------------------------------------------------------------
+
+ /**
+ * Creates the Kafka version specific representation of the given
+ * topic partition.
+ *
+ * @param partition The Flink representation of the Kafka topic partition.
+ * @return The specific Kafka representation of the Kafka topic partition.
+ */
+ public abstract KPH createKafkaPartitionHandle(KafkaTopicPartition partition);
+
+ /**
+ * Commits the given partition offsets to the Kafka brokers (or to ZooKeeper for
+ * older Kafka versions).
+ *
+ * @param offsets The offsets to commit to Kafka.
+ * @throws Exception This method forwards exceptions.
+ */
+ public abstract void commitSpecificOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception;
+
+ // ------------------------------------------------------------------------
+ // snapshot and restore the state
+ // ------------------------------------------------------------------------
+
+ /**
+ * Takes a snapshot of the partition offsets.
+ *
+ * <p>Important: This method mus be called under the checkpoint lock.
+ *
+ * @return A map from partition to current offset.
+ */
+ public HashMap<KafkaTopicPartition, Long> snapshotCurrentState() {
+ // this method assumes that the checkpoint lock is held
+ assert Thread.holdsLock(checkpointLock);
+
+ HashMap<KafkaTopicPartition, Long> state = new HashMap<>(allPartitions.length);
+ for (KafkaTopicPartitionState<?> partition : subscribedPartitions()) {
+ if (partition.isOffsetDefined()) {
+ state.put(partition.getKafkaTopicPartition(), partition.getOffset());
+ }
+ }
+ return state;
+ }
+
+ /**
+ * Restores the partition offsets.
+ *
+ * @param snapshotState The offsets for the partitions
+ */
+ public void restoreOffsets(HashMap<KafkaTopicPartition, Long> snapshotState) {
+ for (KafkaTopicPartitionState<?> partition : allPartitions) {
+ Long offset = snapshotState.get(partition.getKafkaTopicPartition());
+ if (offset != null) {
+ partition.setOffset(offset);
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // emitting records
+ // ------------------------------------------------------------------------
+
+ /**
+ *
+ * <p>Implementation Note: This method is kept brief to be JIT inlining friendly.
+ * That makes the fast path efficient, the extended paths are called as separate methods.
+ *
+ * @param record The record to emit
+ * @param partitionState The state of the Kafka partition from which the record was fetched
+ * @param offset The offset from which the record was fetched
+ */
+ protected final void emitRecord(T record, KafkaTopicPartitionState<KPH> partitionState, long offset) {
+ if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) {
+ // fast path logic, in case there are no watermarks
+
+ // emit the record, using the checkpoint lock to guarantee
+ // atomicity of record emission and offset state update
+ synchronized (checkpointLock) {
+ sourceContext.collect(record);
+ partitionState.setOffset(offset);
+ }
+ }
+ else if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
+ emitRecordWithTimestampAndPeriodicWatermark(record, partitionState, offset);
+ }
+ else {
+ emitRecordWithTimestampAndPunctuatedWatermark(record, partitionState, offset);
+ }
+ }
+
+ /**
+ * Record emission, if a timestamp will be attached from an assigner that is
+ * also a periodic watermark generator.
+ */
+ private void emitRecordWithTimestampAndPeriodicWatermark(
+ T record, KafkaTopicPartitionState<KPH> partitionState, long offset)
+ {
+ @SuppressWarnings("unchecked")
+ final KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH> withWatermarksState =
+ (KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>) partitionState;
+
+ // extract timestamp - this accesses/modifies the per-partition state inside the
+ // watermark generator instance, so we need to lock the access on the
+ // partition state. concurrent access can happen from the periodic emitter
+ final long timestamp;
+ //noinspection SynchronizationOnLocalVariableOrMethodParameter
+ synchronized (withWatermarksState) {
+ timestamp = withWatermarksState.getTimestampForRecord(record);
+ }
+
+ // emit the record with timestamp, using the usual checkpoint lock to guarantee
+ // atomicity of record emission and offset state update
+ synchronized (checkpointLock) {
+ sourceContext.collectWithTimestamp(record, timestamp);
+ partitionState.setOffset(offset);
+ }
+ }
+
+ /**
+ * Record emission, if a timestamp will be attached from an assigner that is
+ * also a punctuated watermark generator.
+ */
+ private void emitRecordWithTimestampAndPunctuatedWatermark(
+ T record, KafkaTopicPartitionState<KPH> partitionState, long offset)
+ {
+ @SuppressWarnings("unchecked")
+ final KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> withWatermarksState =
+ (KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>) partitionState;
+
+ // only one thread ever works on accessing timestamps and watermarks
+ // from the punctuated extractor
+ final long timestamp = withWatermarksState.getTimestampForRecord(record);
+ final Watermark newWatermark = withWatermarksState.checkAndGetNewWatermark(record, timestamp);
+
+ // emit the record with timestamp, using the usual checkpoint lock to guarantee
+ // atomicity of record emission and offset state update
+ synchronized (checkpointLock) {
+ sourceContext.collectWithTimestamp(record, timestamp);
+ partitionState.setOffset(offset);
+ }
+
+ // if we also have a new per-partition watermark, check if that is also a
+ // new cross-partition watermark
+ if (newWatermark != null) {
+ updateMinPunctuatedWatermark(newWatermark);
+ }
+ }
+ /**
+ *Checks whether a new per-partition watermark is also a new cross-partition watermark.
+ */
+ private void updateMinPunctuatedWatermark(Watermark nextWatermark) {
+ if (nextWatermark.getTimestamp() > maxWatermarkSoFar) {
+ long newMin = Long.MAX_VALUE;
+
+ for (KafkaTopicPartitionState<?> state : allPartitions) {
+ @SuppressWarnings("unchecked")
+ final KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> withWatermarksState =
+ (KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>) state;
+
+ newMin = Math.min(newMin, withWatermarksState.getCurrentPartitionWatermark());
+ }
+
+ // double-check locking pattern
+ if (newMin > maxWatermarkSoFar) {
+ synchronized (checkpointLock) {
+ if (newMin > maxWatermarkSoFar) {
+ maxWatermarkSoFar = newMin;
+ sourceContext.emitWatermark(new Watermark(newMin));
+ }
+ }
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ /**
+ * Utility method that takes the topic partitions and creates the topic partition state
+ * holders. If a watermark generator per partition exists, this will also initialize those.
+ */
+ private KafkaTopicPartitionState<KPH>[] initializePartitions(
+ List<KafkaTopicPartition> assignedPartitions,
+ int timestampWatermarkMode,
+ SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
+ SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
+ ClassLoader userCodeClassLoader)
+ throws IOException, ClassNotFoundException
+ {
+ @SuppressWarnings("unchecked")
+ KafkaTopicPartitionState<KPH>[] partitions =
+ (KafkaTopicPartitionState<KPH>[]) new KafkaTopicPartitionState<?>[assignedPartitions.size()];
+
+ int pos = 0;
+ for (KafkaTopicPartition partition : assignedPartitions) {
+ // create the kafka version specific partition handle
+ KPH kafkaHandle = createKafkaPartitionHandle(partition);
+
+ // create the partition state
+ KafkaTopicPartitionState<KPH> partitionState;
+ switch (timestampWatermarkMode) {
+ case NO_TIMESTAMPS_WATERMARKS:
+ partitionState = new KafkaTopicPartitionState<>(partition, kafkaHandle);
+ break;
+ case PERIODIC_WATERMARKS: {
+ AssignerWithPeriodicWatermarks<T> assignerInstance =
+ watermarksPeriodic.deserializeValue(userCodeClassLoader);
+ partitionState = new KafkaTopicPartitionStateWithPeriodicWatermarks<>(
+ partition, kafkaHandle, assignerInstance);
+ break;
+ }
+
+ case PUNCTUATED_WATERMARKS: {
+ AssignerWithPunctuatedWatermarks<T> assignerInstance =
+ watermarksPunctuated.deserializeValue(userCodeClassLoader);
+ partitionState = new KafkaTopicPartitionStateWithPunctuatedWatermarks<>(
+ partition, kafkaHandle, assignerInstance);
+ break;
+ }
+ default:
+ // cannot happen, add this as a guard for the future
+ throw new RuntimeException();
+ }
+
+ partitions[pos++] = partitionState;
+ }
+
+ return partitions;
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * The periodic watermark emitter. In its given interval, it checks all partitions for
+ * the current event time watermark, and possibly emits the next watermark.
+ */
+ private static class PeriodicWatermarkEmitter implements Triggerable {
+
+ private final KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] allPartitions;
+
+ private final SourceContext<?> emitter;
+
+ private final StreamingRuntimeContext triggerContext;
+
+ private final long interval;
+
+ private long lastWatermarkTimestamp;
+
+ //-------------------------------------------------
+
+ PeriodicWatermarkEmitter(
+ KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] allPartitions,
+ SourceContext<?> emitter,
+ StreamingRuntimeContext runtimeContext)
+ {
+ this.allPartitions = checkNotNull(allPartitions);
+ this.emitter = checkNotNull(emitter);
+ this.triggerContext = checkNotNull(runtimeContext);
+ this.interval = runtimeContext.getExecutionConfig().getAutoWatermarkInterval();
+ this.lastWatermarkTimestamp = Long.MIN_VALUE;
+ }
+
+ //-------------------------------------------------
+
+ public void start() {
+ triggerContext.registerTimer(System.currentTimeMillis() + interval, this);
+ }
+
+ @Override
+ public void trigger(long timestamp) throws Exception {
+ // sanity check
+ assert Thread.holdsLock(emitter.getCheckpointLock());
+
+ long minAcrossAll = Long.MAX_VALUE;
+ for (KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?> state : allPartitions) {
+
+ // we access the current watermark for the periodic assigners under the state
+ // lock, to prevent concurrent modification to any internal variables
+ final long curr;
+ //noinspection SynchronizationOnLocalVariableOrMethodParameter
+ synchronized (state) {
+ curr = state.getCurrentWatermarkTimestamp();
+ }
+
+ minAcrossAll = Math.min(minAcrossAll, curr);
+ }
+
+ // emit next watermark, if there is one
+ if (minAcrossAll > lastWatermarkTimestamp) {
+ lastWatermarkTimestamp = minAcrossAll;
+ emitter.emitWatermark(new Watermark(minAcrossAll));
+ }
+
+ // schedule the next watermark
+ triggerContext.registerTimer(System.currentTimeMillis() + interval, this);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java
new file mode 100644
index 0000000..9a0e4e3
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import javax.annotation.Nullable;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ *
+ */
+public class ExceptionProxy {
+
+ /** The thread that should be interrupted when an exception occurs */
+ private final Thread toInterrupt;
+
+ /** The exception to throw */
+ private final AtomicReference<Throwable> exception;
+
+ /**
+ *
+ * @param toInterrupt The thread to interrupt upon an exception. May be null.
+ */
+ public ExceptionProxy(@Nullable Thread toInterrupt) {
+ this.toInterrupt = toInterrupt;
+ this.exception = new AtomicReference<>();
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * Sets the exception occurred and interrupts the target thread,
+ * if no other exception has occurred so far.
+ *
+ * @param t The exception that occurred
+ */
+ public void reportError(Throwable t) {
+ // set the exception, if it is the first
+ if (exception.compareAndSet(null, t) && toInterrupt != null) {
+ toInterrupt.interrupt();
+ }
+ }
+
+ public void checkAndThrowException() throws Exception {
+ Throwable t = exception.get();
+ if (t != null) {
+ if (t instanceof Exception) {
+ throw (Exception) t;
+ }
+ else if (t instanceof Error) {
+ throw (Error) t;
+ }
+ else {
+ throw new Exception(t);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaPartitionState.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaPartitionState.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaPartitionState.java
deleted file mode 100644
index 11a392a..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaPartitionState.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import java.io.Serializable;
-
-public class KafkaPartitionState implements Serializable {
-
- private static final long serialVersionUID = 722083576322742328L;
-
- private final int partitionID;
- private long offset;
-
- private long maxTimestamp = Long.MIN_VALUE;
- private boolean isActive = false;
-
- public KafkaPartitionState(int id, long offset) {
- this.partitionID = id;
- this.offset = offset;
- }
-
- public void setOffset(long offset) {
- this.offset = offset;
- }
-
- public void setActive(boolean isActive) {
- this.isActive = isActive;
- }
-
- public void setMaxTimestamp(long timestamp) {
- maxTimestamp = timestamp;
- }
-
- public int getPartition() {
- return partitionID;
- }
-
- public boolean isActive() {
- return isActive;
- }
-
- public long getMaxTimestamp() {
- return maxTimestamp;
- }
-
- public long getOffset() {
- return offset;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
index aea14cf..c68fe28 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
@@ -24,14 +24,20 @@ import java.util.Map;
import static java.util.Objects.requireNonNull;
-
/**
- * A serializable representation of a kafka topic and a partition.
- * Used as an operator state for the Kafka consumer
+ * Flink's description of a partition in a Kafka topic.
+ * Serializable, and common across all Kafka consumer subclasses (0.8, 0.9, ...)
+ *
+ * <p>Note: This class must not change in its structure, because it would change the
+ * serialization format and make previous savepoints unreadable.
*/
-public class KafkaTopicPartition implements Serializable {
+public final class KafkaTopicPartition implements Serializable {
+ /** THIS SERIAL VERSION UID MUST NOT CHANGE, BECAUSE IT WOULD BREAK
+ * READING OLD SERIALIZED INSTANCES FROM SAVEPOINTS */
private static final long serialVersionUID = 722083576322742325L;
+
+ // ------------------------------------------------------------------------
private final String topic;
private final int partition;
@@ -43,6 +49,8 @@ public class KafkaTopicPartition implements Serializable {
this.cachedHash = 31 * topic.hashCode() + partition;
}
+ // ------------------------------------------------------------------------
+
public String getTopic() {
return topic;
}
@@ -51,6 +59,8 @@ public class KafkaTopicPartition implements Serializable {
return partition;
}
+ // ------------------------------------------------------------------------
+
@Override
public String toString() {
return "KafkaTopicPartition{" +
@@ -64,25 +74,23 @@ public class KafkaTopicPartition implements Serializable {
if (this == o) {
return true;
}
- if (!(o instanceof KafkaTopicPartition)) {
- return false;
+ else if (o instanceof KafkaTopicPartition) {
+ KafkaTopicPartition that = (KafkaTopicPartition) o;
+ return this.partition == that.partition && this.topic.equals(that.topic);
}
-
- KafkaTopicPartition that = (KafkaTopicPartition) o;
-
- if (partition != that.partition) {
+ else {
return false;
}
- return topic.equals(that.topic);
}
@Override
public int hashCode() {
return cachedHash;
}
-
-
- // ------------------- Utilities -------------------------------------
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
public static String toString(Map<KafkaTopicPartition, Long> map) {
StringBuilder sb = new StringBuilder();
http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java
new file mode 100644
index 0000000..36612a4
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+/**
+ * The state that the Flink Kafka Consumer holds for each Kafka partition.
+ * Includes the Kafka descriptor for partitions.
+ *
+ * <p>This class describes the most basic state (only the offset), subclasses
+ * define more elaborate state, containing current watermarks and timestamp
+ * extractors.
+ *
+ * @param <KPH> The type of the Kafka partition descriptor, which varies across Kafka versions.
+ */
+public class KafkaTopicPartitionState<KPH> {
+
+ /** Magic number to define an unset offset. Negative offsets are not used by Kafka (invalid),
+ * and we pick a number that is probably (hopefully) not used by Kafka as a magic number for anything else. */
+ public static final long OFFSET_NOT_SET = -915623761776L;
+
+ // ------------------------------------------------------------------------
+
+ /** The Flink description of a Kafka partition */
+ private final KafkaTopicPartition partition;
+
+ /** The Kafka description of a Kafka partition (varies across different Kafka versions) */
+ private final KPH kafkaPartitionHandle;
+
+ /** The offset within the Kafka partition that we already processed */
+ private volatile long offset;
+
+ // ------------------------------------------------------------------------
+
+ public KafkaTopicPartitionState(KafkaTopicPartition partition, KPH kafkaPartitionHandle) {
+ this.partition = partition;
+ this.kafkaPartitionHandle = kafkaPartitionHandle;
+ this.offset = OFFSET_NOT_SET;
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * Gets Flink's descriptor for the Kafka Partition.
+ * @return The Flink partition descriptor.
+ */
+ public final KafkaTopicPartition getKafkaTopicPartition() {
+ return partition;
+ }
+
+ /**
+ * Gets Kafka's descriptor for the Kafka Partition.
+ * @return The Kafka partition descriptor.
+ */
+ public final KPH getKafkaPartitionHandle() {
+ return kafkaPartitionHandle;
+ }
+
+ public final String getTopic() {
+ return partition.getTopic();
+ }
+
+ public final int getPartition() {
+ return partition.getPartition();
+ }
+
+ /**
+ * The current offset in the partition. This refers to the offset last element that
+ * we retrieved and emitted successfully. It is the offset that should be stored in
+ * a checkpoint.
+ */
+ public final long getOffset() {
+ return offset;
+ }
+
+ public final void setOffset(long offset) {
+ this.offset = offset;
+ }
+
+ public final boolean isOffsetDefined() {
+ return offset != OFFSET_NOT_SET;
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public String toString() {
+ return "Partition: " + partition + ", KafkaPartitionHandle=" + kafkaPartitionHandle
+ + ", offset=" + (isOffsetDefined() ? String.valueOf(offset) : "(not set)");
+ }
+}