You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/03/14 17:10:41 UTC

[2/2] flink git commit: [FLINK-6006] [kafka] Always use complete restored state in FlinkKafkaConsumer

[FLINK-6006] [kafka] Always use complete restored state in FlinkKafkaConsumer

Previously, the Kafka Consumer performs partition list querying on
restore, and then uses it to filter out restored state of partitions
that doesn't exist in the list.

If in any case the returned partitions list is incomplete (i.e. missing
partitions that existed before perhaps due to temporary ZK / broker
downtimes), then the state of the missing partitions is dropped and
cannot be recovered anymore.

This commit fixes this by always restoring the complete state, without
any sort of filtering. We simply let the consumer fail if assigned
partitions to the consuming threads / Kafka clients are unreachable when
the consumer starts running.

This closes #3505.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/24306ada
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/24306ada
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/24306ada

Branch: refs/heads/release-1.2
Commit: 24306adabdda28620395b2fa93caa86a13731258
Parents: 0c532ed
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Fri Mar 10 14:59:34 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Wed Mar 15 01:08:52 2017 +0800

----------------------------------------------------------------------
 .../kafka/FlinkKafkaConsumerBase.java           |  27 +++--
 .../kafka/internals/AbstractFetcher.java        |  27 ++++-
 .../kafka/FlinkKafkaConsumerBaseTest.java       | 111 +++++++++++++++----
 3 files changed, 125 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/24306ada/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index 2918080..bfc347f 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.commons.collections.map.LinkedMap;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -293,11 +294,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 
 	@Override
 	public void open(Configuration configuration) {
-		List<KafkaTopicPartition> kafkaTopicPartitions = getKafkaPartitions(topics);
-
-		if (kafkaTopicPartitions != null) {
-			assignTopicPartitions(kafkaTopicPartitions);
-		}
+		assignTopicPartitions();
 	}
 
 	@Override
@@ -489,16 +486,15 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 	//  Utilities
 	// ------------------------------------------------------------------------
 
-	private void assignTopicPartitions(List<KafkaTopicPartition> kafkaTopicPartitions) {
-		subscribedPartitions = new ArrayList<>();
-
+	private void assignTopicPartitions() {
 		if (restoreToOffset != null) {
-			for (KafkaTopicPartition kafkaTopicPartition : kafkaTopicPartitions) {
-				if (restoreToOffset.containsKey(kafkaTopicPartition)) {
-					subscribedPartitions.add(kafkaTopicPartition);
-				}
+			subscribedPartitions = new ArrayList<>(restoreToOffset.size());
+			for (Map.Entry<KafkaTopicPartition, Long> restoredPartitionState : restoreToOffset.entrySet()) {
+				subscribedPartitions.add(restoredPartitionState.getKey());
 			}
 		} else {
+			List<KafkaTopicPartition> kafkaTopicPartitions = getKafkaPartitions(topics);
+
 			Collections.sort(kafkaTopicPartitions, new Comparator<KafkaTopicPartition>() {
 				@Override
 				public int compare(KafkaTopicPartition o1, KafkaTopicPartition o2) {
@@ -512,6 +508,8 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 				}
 			});
 
+			subscribedPartitions = new ArrayList<>(
+				(kafkaTopicPartitions.size() / getRuntimeContext().getNumberOfParallelSubtasks()) + 1);
 			for (int i = getRuntimeContext().getIndexOfThisSubtask(); i < kafkaTopicPartitions.size(); i += getRuntimeContext().getNumberOfParallelSubtasks()) {
 				subscribedPartitions.add(kafkaTopicPartitions.get(i));
 			}
@@ -569,4 +567,9 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 		
 		logger.info(sb.toString());
 	}
+
+	@VisibleForTesting
+	List<KafkaTopicPartition> getSubscribedPartitions() {
+		return subscribedPartitions;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/24306ada/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
index 821eb03..83ade97 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -194,14 +194,29 @@ public abstract class AbstractFetcher<T, KPH> {
 
 	/**
 	 * Restores the partition offsets.
+	 * The partitions in the provided map of restored partitions to offsets must completely match
+	 * the fetcher's subscribed partitions.
 	 * 
-	 * @param snapshotState The offsets for the partitions 
+	 * @param restoredOffsets The restored offsets for the partitions
+	 *
+	 * @throws IllegalStateException if the partitions in the provided restored offsets map
+	 * cannot completely match the fetcher's subscribed partitions.
 	 */
-	public void restoreOffsets(Map<KafkaTopicPartition, Long> snapshotState) {
-		for (KafkaTopicPartitionState<?> partition : allPartitions) {
-			Long offset = snapshotState.get(partition.getKafkaTopicPartition());
-			if (offset != null) {
-				partition.setOffset(offset);
+	public void restoreOffsets(Map<KafkaTopicPartition, Long> restoredOffsets) {
+		if (restoredOffsets.size() != allPartitions.length) {
+			throw new IllegalStateException(
+				"The fetcher was restored with partition offsets that do not " +
+					"match with the subscribed partitions: " + restoredOffsets);
+		} else {
+			for (KafkaTopicPartitionState<?> partition : allPartitions) {
+				Long offset = restoredOffsets.get(partition.getKafkaTopicPartition());
+				if (offset != null) {
+					partition.setOffset(offset);
+				} else {
+					throw new IllegalStateException(
+						"The fetcher was restored with partition offsets that do not " +
+							"contain offsets for subscribed partition " + partition.getKafkaTopicPartition());
+				}
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/24306ada/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index b96ba30..3b67a07 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -20,10 +20,13 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.commons.collections.map.LinkedMap;
 import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.KeyedStateStore;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
@@ -37,8 +40,6 @@ import org.apache.flink.util.SerializedValue;
 import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Matchers;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 
 import java.io.Serializable;
 import java.lang.reflect.Field;
@@ -48,15 +49,15 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class FlinkKafkaConsumerBaseTest {
@@ -65,14 +66,17 @@ public class FlinkKafkaConsumerBaseTest {
 	 * Tests that not both types of timestamp extractors / watermark generators can be used.
 	 */
 	@Test
+	@SuppressWarnings("unchecked")
 	public void testEitherWatermarkExtractor() {
 		try {
-			new DummyFlinkKafkaConsumer<>().assignTimestampsAndWatermarks((AssignerWithPeriodicWatermarks<Object>) null);
+			new DummyFlinkKafkaConsumer<>(mock(AbstractFetcher.class), Collections.<KafkaTopicPartition>emptyList())
+					.assignTimestampsAndWatermarks((AssignerWithPeriodicWatermarks<Object>) null);
 			fail();
 		} catch (NullPointerException ignored) {}
 
 		try {
-			new DummyFlinkKafkaConsumer<>().assignTimestampsAndWatermarks((AssignerWithPunctuatedWatermarks<Object>) null);
+			new DummyFlinkKafkaConsumer<>(mock(AbstractFetcher.class), Collections.<KafkaTopicPartition>emptyList())
+					.assignTimestampsAndWatermarks((AssignerWithPunctuatedWatermarks<Object>) null);
 			fail();
 		} catch (NullPointerException ignored) {}
 		
@@ -81,14 +85,14 @@ public class FlinkKafkaConsumerBaseTest {
 		@SuppressWarnings("unchecked")
 		final AssignerWithPunctuatedWatermarks<String> punctuatedAssigner = mock(AssignerWithPunctuatedWatermarks.class);
 		
-		DummyFlinkKafkaConsumer<String> c1 = new DummyFlinkKafkaConsumer<>();
+		DummyFlinkKafkaConsumer<String> c1 = new DummyFlinkKafkaConsumer<>(mock(AbstractFetcher.class), Collections.<KafkaTopicPartition>emptyList());
 		c1.assignTimestampsAndWatermarks(periodicAssigner);
 		try {
 			c1.assignTimestampsAndWatermarks(punctuatedAssigner);
 			fail();
 		} catch (IllegalStateException ignored) {}
 
-		DummyFlinkKafkaConsumer<String> c2 = new DummyFlinkKafkaConsumer<>();
+		DummyFlinkKafkaConsumer<String> c2 = new DummyFlinkKafkaConsumer<>(mock(AbstractFetcher.class), Collections.<KafkaTopicPartition>emptyList());
 		c2.assignTimestampsAndWatermarks(punctuatedAssigner);
 		try {
 			c2.assignTimestampsAndWatermarks(periodicAssigner);
@@ -186,9 +190,10 @@ public class FlinkKafkaConsumerBaseTest {
 	 * Tests that on snapshots, states and offsets to commit to Kafka are correct
 	 */
 	@Test
+	@SuppressWarnings("unchecked")
 	public void checkUseFetcherWhenNoCheckpoint() throws Exception {
 
-		FlinkKafkaConsumerBase<String> consumer = getConsumer(null, new LinkedMap(), true);
+		FlinkKafkaConsumerBase<String> consumer = getConsumer(mock(AbstractFetcher.class), new LinkedMap(), true);
 		List<KafkaTopicPartition> partitionList = new ArrayList<>(1);
 		partitionList.add(new KafkaTopicPartition("test", 0));
 		consumer.setSubscribedPartitions(partitionList);
@@ -207,6 +212,66 @@ public class FlinkKafkaConsumerBaseTest {
 		consumer.run(mock(SourceFunction.SourceContext.class));
 	}
 
+	/**
+	 * Tests that the fetcher is restored with all partitions in the restored state,
+	 * regardless of the queried complete list of Kafka partitions.
+	 */
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testStateIntactOnRestore() throws Exception {
+		final AbstractFetcher<String, ?> fetcher = mock(AbstractFetcher.class);
+
+		FlinkKafkaConsumerBase<String> consumer = new DummyFlinkKafkaConsumer<>(
+				fetcher,
+				Collections.<KafkaTopicPartition>emptyList()); // mock queried list; this should not affect restore state
+
+		final OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
+		final ListState mockListState = mock(ListState.class);
+		List<Tuple2<KafkaTopicPartition, Long>> restoredPartitionOffsets = new ArrayList<>(3);
+		restoredPartitionOffsets.add(new Tuple2<>(new KafkaTopicPartition("test-topic", 0), 23L));
+		restoredPartitionOffsets.add(new Tuple2<>(new KafkaTopicPartition("test-topic", 1), 40L));
+		restoredPartitionOffsets.add(new Tuple2<>(new KafkaTopicPartition("test-topic", 2), 32L));
+		when(mockListState.get()).thenReturn(restoredPartitionOffsets);
+		when(operatorStateStore.getSerializableListState(Matchers.anyString())).thenReturn(mockListState);
+
+		List<KafkaTopicPartition> expectedSubscribedPartitions = new ArrayList<>(3);
+		expectedSubscribedPartitions.add(new KafkaTopicPartition("test-topic", 0));
+		expectedSubscribedPartitions.add(new KafkaTopicPartition("test-topic", 1));
+		expectedSubscribedPartitions.add(new KafkaTopicPartition("test-topic", 2));
+
+		Map<KafkaTopicPartition, Long> expectedFetcherRestoreState = new HashMap<>();
+		expectedFetcherRestoreState.put(new KafkaTopicPartition("test-topic", 0), 23L);
+		expectedFetcherRestoreState.put(new KafkaTopicPartition("test-topic", 1), 40L);
+		expectedFetcherRestoreState.put(new KafkaTopicPartition("test-topic", 2), 32L);
+
+		consumer.initializeState(new FunctionInitializationContext() {
+			@Override
+			public boolean isRestored() {
+				return true;
+			}
+
+			@Override
+			public OperatorStateStore getOperatorStateStore() {
+				return operatorStateStore;
+			}
+
+			@Override
+			public KeyedStateStore getKeyedStateStore() {
+				throw new UnsupportedOperationException();
+			}
+		});
+
+		consumer.open(new Configuration());
+
+		consumer.run(mock(SourceFunction.SourceContext.class));
+
+		assertEquals(expectedSubscribedPartitions.size(), consumer.getSubscribedPartitions().size());
+		for (KafkaTopicPartition expectedPartition : expectedSubscribedPartitions) {
+			consumer.getSubscribedPartitions().contains(expectedPartition);
+		}
+		verify(fetcher).restoreOffsets(expectedFetcherRestoreState);
+	}
+
 	@Test
 	@SuppressWarnings("unchecked")
 	public void testSnapshotState() throws Exception {
@@ -333,7 +398,7 @@ public class FlinkKafkaConsumerBaseTest {
 	private static <T> FlinkKafkaConsumerBase<T> getConsumer(
 			AbstractFetcher<T, ?> fetcher, LinkedMap pendingOffsetsToCommit, boolean running) throws Exception
 	{
-		FlinkKafkaConsumerBase<T> consumer = new DummyFlinkKafkaConsumer<>();
+		FlinkKafkaConsumerBase<T> consumer = new DummyFlinkKafkaConsumer<>(fetcher, Collections.<KafkaTopicPartition>emptyList());
 
 		Field fetcherField = FlinkKafkaConsumerBase.class.getDeclaredField("kafkaFetcher");
 		fetcherField.setAccessible(true);
@@ -355,27 +420,29 @@ public class FlinkKafkaConsumerBaseTest {
 	private static class DummyFlinkKafkaConsumer<T> extends FlinkKafkaConsumerBase<T> {
 		private static final long serialVersionUID = 1L;
 
+		private final AbstractFetcher<T, ?> mockFetcher;
+		private final List<KafkaTopicPartition> mockKafkaPartitions;
+
 		@SuppressWarnings("unchecked")
-		public DummyFlinkKafkaConsumer() {
+		public DummyFlinkKafkaConsumer(AbstractFetcher<T, ?> mockFetcher, List<KafkaTopicPartition> mockKafkaPartitions) {
 			super(Arrays.asList("dummy-topic"), (KeyedDeserializationSchema < T >) mock(KeyedDeserializationSchema.class));
+			this.mockFetcher = mockFetcher;
+			this.mockKafkaPartitions = mockKafkaPartitions;
 		}
 
 		@Override
-		protected AbstractFetcher<T, ?> createFetcher(SourceContext<T> sourceContext, List<KafkaTopicPartition> thisSubtaskPartitions, SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, StreamingRuntimeContext runtimeContext) throws Exception {
-			AbstractFetcher<T, ?> fetcher = mock(AbstractFetcher.class);
-			doAnswer(new Answer() {
-				@Override
-				public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
-					Assert.fail("Trying to restore offsets even though there was no restore state.");
-					return null;
-				}
-			}).when(fetcher).restoreOffsets(any(HashMap.class));
-			return fetcher;
+		protected AbstractFetcher<T, ?> createFetcher(
+				SourceContext<T> sourceContext,
+				List<KafkaTopicPartition> thisSubtaskPartitions,
+				SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
+				SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
+				StreamingRuntimeContext runtimeContext) throws Exception {
+			return mockFetcher;
 		}
 
 		@Override
 		protected List<KafkaTopicPartition> getKafkaPartitions(List<String> topics) {
-			return Collections.emptyList();
+			return mockKafkaPartitions;
 		}
 
 		@Override