You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/07/22 12:01:55 UTC
[2/6] flink git commit: [FLINK-7143] [kafka] Fix detection of
restored bit in Kafka Consumer
[FLINK-7143] [kafka] Fix detection of restored bit in Kafka Consumer
Before, the problem was that empty state was associated with the source
not being restored. However, a source can have empty restored state in
one of two cases:
1. The source was not restored.
2. The overall job was restored but the source simply didn't get any
operator state assigned.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b0564322
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b0564322
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b0564322
Branch: refs/heads/release-1.3
Commit: b0564322c61168c3a7bb23bdca3db0648454a691
Parents: 6e0d90c
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Jul 17 19:06:09 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Sat Jul 22 19:36:37 2017 +0800
----------------------------------------------------------------------
.../connectors/kafka/FlinkKafkaConsumerBase.java | 17 ++++++++++++-----
.../kafka/FlinkKafkaConsumerBaseMigrationTest.java | 11 ++++-------
2 files changed, 16 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b0564322/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index 8dbca72..d9b75bb 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -142,6 +142,9 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
/** Flag indicating whether the consumer is still running **/
private volatile boolean running = true;
+ /** Whether this operator instance was restored from checkpointed state. */
+ private transient boolean restored = false;
+
// ------------------------------------------------------------------------
/**
@@ -354,7 +357,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
subscribedPartitionsToStartOffsets = new HashMap<>(kafkaTopicPartitions.size());
- if (restoredState != null) {
+ if (restored) {
for (KafkaTopicPartition kafkaTopicPartition : kafkaTopicPartitions) {
if (restoredState.containsKey(kafkaTopicPartition)) {
subscribedPartitionsToStartOffsets.put(kafkaTopicPartition, restoredState.get(kafkaTopicPartition));
@@ -503,6 +506,11 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
+ // we might have been restored via restoreState() which restores from legacy operator state
+ if (!restored) {
+ restored = context.isRestored();
+ }
+
OperatorStateStore stateStore = context.getOperatorStateStore();
offsetsStateForCheckpoint = stateStore.getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
@@ -518,9 +526,6 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
LOG.debug("Using the following offsets: {}", restoredState);
}
}
- if (restoredState != null && restoredState.isEmpty()) {
- restoredState = null;
- }
} else {
LOG.info("No restore state for FlinkKafkaConsumer.");
}
@@ -576,7 +581,9 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
LOG.info("{} (taskIdx={}) restoring offsets from an older version.",
getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask());
- restoredState = restoredOffsets.isEmpty() ? null : restoredOffsets;
+ restoredState = restoredOffsets.isEmpty()
+ ? new HashMap<KafkaTopicPartition, Long>() : restoredOffsets;
+ restored = true;
if (LOG.isDebugEnabled()) {
LOG.debug("{} (taskIdx={}) restored offsets from an older Flink version: {}",
http://git-wip-us.apache.org/repos/asf/flink/blob/b0564322/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
index 70e60f3..e7822a7 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
@@ -202,7 +202,7 @@ public class FlinkKafkaConsumerBaseMigrationTest {
assertTrue(consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty());
// assert that no state was restored
- assertTrue(consumerFunction.getRestoredState() == null);
+ assertTrue(consumerFunction.getRestoredState().isEmpty());
consumerOperator.close();
consumerOperator.cancel();
@@ -244,12 +244,9 @@ public class FlinkKafkaConsumerBaseMigrationTest {
expectedSubscribedPartitionsWithStartOffsets.put(partition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
}
- // assert that there are partitions and is identical to expected list
- assertTrue(consumerFunction.getSubscribedPartitionsToStartOffsets() != null);
- assertTrue(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty());
- Assert.assertEquals(expectedSubscribedPartitionsWithStartOffsets, consumerFunction.getSubscribedPartitionsToStartOffsets());
-
- assertTrue(consumerFunction.getRestoredState() == null);
+ // verify that we do not try to fetch the partitions list and subscribe to them even though we have empty state
+ assertTrue(consumerFunction.getRestoredState().isEmpty());
+ assertTrue(consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty());
consumerOperator.close();
consumerOperator.cancel();