You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/12/20 15:09:19 UTC

[04/15] flink git commit: [FLINK-5293] Make Kafka consumer backwards compatible with 1.1 snapshots

[FLINK-5293] Make Kafka consumer backwards compatible with 1.1 snapshots


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/216653ad
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/216653ad
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/216653ad

Branch: refs/heads/master
Commit: 216653ad5864302dbcda4be5b88a83c4c039a05c
Parents: 49f1a03
Author: kl0u <kk...@gmail.com>
Authored: Thu Dec 15 16:46:33 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Dec 20 15:42:54 2016 +0100

----------------------------------------------------------------------
 .../connectors/kafka/FlinkKafkaConsumerBase.java  | 18 +++++++++++++++++-
 1 file changed, 17 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/216653ad/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index aef7116..13bfe93 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
@@ -65,7 +66,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T> implements 
 		CheckpointListener,
 		ResultTypeQueryable<T>,
-		CheckpointedFunction {
+		CheckpointedFunction,
+		CheckpointedRestoring<HashMap<KafkaTopicPartition, Long>> {
+
 	private static final long serialVersionUID = -6272159445203409112L;
 
 	protected static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumerBase.class);
@@ -382,6 +385,19 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 	}
 
 	@Override
+	public void restoreState(HashMap<KafkaTopicPartition, Long> restoredOffsets) {
+		LOG.info("{} (taskIdx={}) restoring offsets from an older version.",
+			getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask());
+
+		restoreToOffset = restoredOffsets;
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("{} (taskIdx={}) restored offsets from an older Flink version: {}",
+				getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask(), restoreToOffset);
+		}
+	}
+
+	@Override
 	public void notifyCheckpointComplete(long checkpointId) throws Exception {
 		if (!running) {
 			LOG.debug("notifyCheckpointComplete() called on closed source");