You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/12/20 15:09:19 UTC
[04/15] flink git commit: [FLINK-5293] Make Kafka consumer backwards
compatible with 1.1 snapshots
[FLINK-5293] Make Kafka consumer backwards compatible with 1.1 snapshots
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/216653ad
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/216653ad
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/216653ad
Branch: refs/heads/master
Commit: 216653ad5864302dbcda4be5b88a83c4c039a05c
Parents: 49f1a03
Author: kl0u <kk...@gmail.com>
Authored: Thu Dec 15 16:46:33 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Dec 20 15:42:54 2016 +0100
----------------------------------------------------------------------
.../connectors/kafka/FlinkKafkaConsumerBase.java | 18 +++++++++++++++++-
1 file changed, 17 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/216653ad/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index aef7116..13bfe93 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
@@ -65,7 +66,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T> implements
CheckpointListener,
ResultTypeQueryable<T>,
- CheckpointedFunction {
+ CheckpointedFunction,
+ CheckpointedRestoring<HashMap<KafkaTopicPartition, Long>> {
+
private static final long serialVersionUID = -6272159445203409112L;
protected static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumerBase.class);
@@ -382,6 +385,19 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
}
@Override
+ public void restoreState(HashMap<KafkaTopicPartition, Long> restoredOffsets) {
+ LOG.info("{} (taskIdx={}) restoring offsets from an older version.",
+ getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask());
+
+ restoreToOffset = restoredOffsets;
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{} (taskIdx={}) restored offsets from an older Flink version: {}",
+ getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask(), restoreToOffset);
+ }
+ }
+
+ @Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
if (!running) {
LOG.debug("notifyCheckpointComplete() called on closed source");