You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/07/22 12:01:55 UTC

[2/6] flink git commit: [FLINK-7143] [kafka] Fix detection of restored bit in Kafka Consumer

[FLINK-7143] [kafka] Fix detection of restored bit in Kafka Consumer

Before, the problem was that empty state was associated with the source
not being restored. However, a source can have empty restored state in
one of two cases:

1. The source was not restored.
2. The overall job was restored but the source simply didn't get any
operator state assigned.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b0564322
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b0564322
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b0564322

Branch: refs/heads/release-1.3
Commit: b0564322c61168c3a7bb23bdca3db0648454a691
Parents: 6e0d90c
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Jul 17 19:06:09 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Sat Jul 22 19:36:37 2017 +0800

----------------------------------------------------------------------
 .../connectors/kafka/FlinkKafkaConsumerBase.java   | 17 ++++++++++++-----
 .../kafka/FlinkKafkaConsumerBaseMigrationTest.java | 11 ++++-------
 2 files changed, 16 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b0564322/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index 8dbca72..d9b75bb 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -142,6 +142,9 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 	/** Flag indicating whether the consumer is still running **/
 	private volatile boolean running = true;
 
+	/** Whether this operator instance was restored from checkpointed state. */
+	private transient boolean restored = false;
+
 	// ------------------------------------------------------------------------
 
 	/**
@@ -354,7 +357,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 
 		subscribedPartitionsToStartOffsets = new HashMap<>(kafkaTopicPartitions.size());
 
-		if (restoredState != null) {
+		if (restored) {
 			for (KafkaTopicPartition kafkaTopicPartition : kafkaTopicPartitions) {
 				if (restoredState.containsKey(kafkaTopicPartition)) {
 					subscribedPartitionsToStartOffsets.put(kafkaTopicPartition, restoredState.get(kafkaTopicPartition));
@@ -503,6 +506,11 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 	@Override
 	public void initializeState(FunctionInitializationContext context) throws Exception {
 
+		// we might have been restored via restoreState() which restores from legacy operator state
+		if (!restored) {
+			restored = context.isRestored();
+		}
+
 		OperatorStateStore stateStore = context.getOperatorStateStore();
 		offsetsStateForCheckpoint = stateStore.getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
 
@@ -518,9 +526,6 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 					LOG.debug("Using the following offsets: {}", restoredState);
 				}
 			}
-			if (restoredState != null && restoredState.isEmpty()) {
-				restoredState = null;
-			}
 		} else {
 			LOG.info("No restore state for FlinkKafkaConsumer.");
 		}
@@ -576,7 +581,9 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 		LOG.info("{} (taskIdx={}) restoring offsets from an older version.",
 			getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask());
 
-		restoredState = restoredOffsets.isEmpty() ? null : restoredOffsets;
+		restoredState = restoredOffsets.isEmpty()
+			? new HashMap<KafkaTopicPartition, Long>() : restoredOffsets;
+		restored = true;
 
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("{} (taskIdx={}) restored offsets from an older Flink version: {}",

http://git-wip-us.apache.org/repos/asf/flink/blob/b0564322/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
index 70e60f3..e7822a7 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
@@ -202,7 +202,7 @@ public class FlinkKafkaConsumerBaseMigrationTest {
 		assertTrue(consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty());
 
 		// assert that no state was restored
-		assertTrue(consumerFunction.getRestoredState() == null);
+		assertTrue(consumerFunction.getRestoredState().isEmpty());
 
 		consumerOperator.close();
 		consumerOperator.cancel();
@@ -244,12 +244,9 @@ public class FlinkKafkaConsumerBaseMigrationTest {
 			expectedSubscribedPartitionsWithStartOffsets.put(partition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
 		}
 
-		// assert that there are partitions and is identical to expected list
-		assertTrue(consumerFunction.getSubscribedPartitionsToStartOffsets() != null);
-		assertTrue(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty());
-		Assert.assertEquals(expectedSubscribedPartitionsWithStartOffsets, consumerFunction.getSubscribedPartitionsToStartOffsets());
-
-		assertTrue(consumerFunction.getRestoredState() == null);
+		// verify that we do not try to fetch the partitions list and subscribe to them even though we have empty state
+		assertTrue(consumerFunction.getRestoredState().isEmpty());
+		assertTrue(consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty());
 
 		consumerOperator.close();
 		consumerOperator.cancel();