You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/07/22 12:01:57 UTC

[4/6] flink git commit: [FLINK-7143] [kafka] Add test for Kafka Consumer rescaling

[FLINK-7143] [kafka] Add test for Kafka Consumer rescaling

This verifies that the consumer always correctly knows whether it is
restored or not and is not affected by changes in the partitions as
reported by Kafka.

Previously, operator state reshuffling could lead to partitions being
subscribed to multiple times.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3369cfe2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3369cfe2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3369cfe2

Branch: refs/heads/release-1.3
Commit: 3369cfe200bf2cb7ed04caf19d8599075e4cfe21
Parents: ead25aa
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue Jul 18 11:57:46 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Sat Jul 22 19:37:49 2017 +0800

----------------------------------------------------------------------
 .../kafka/FlinkKafkaConsumerBaseTest.java       | 164 ++++++++++++++++++-
 1 file changed, 155 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3369cfe2/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index ccf2ed2..31883ce 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -24,16 +24,23 @@ import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
+import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
 import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
 import org.junit.Assert;
 import org.junit.Test;
@@ -51,8 +58,16 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static org.hamcrest.Matchers.everyItem;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.collection.IsIn.isIn;
+import static org.hamcrest.collection.IsMapContaining.hasEntry;
+import static org.hamcrest.collection.IsMapContaining.hasKey;
+import static org.hamcrest.core.IsNot.not;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.anyMap;
@@ -532,6 +547,118 @@ public class FlinkKafkaConsumerBaseTest {
 		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap()); // not offsets should be committed
 	}
 
+	@Test
+	public void testScaleUp() throws Exception {
+		testRescaling(5, 2, 15, 1000);
+	}
+
+	@Test
+	public void testScaleDown() throws Exception {
+		testRescaling(5, 10, 2, 1);
+	}
+
+	/**
+	 * Tests whether the Kafka consumer behaves correctly when scaling the parallelism up/down,
+	 * which means that operator state is being reshuffled.
+	 *
+	 * <p>This also verifies that a restoring source is always impervious to changes in the list
+	 * of topics fetched from Kafka.
+	 */
+	void testRescaling(
+		final int initialParallelism,
+		final int numPartitions,
+		final int restoredParallelism,
+		final int restoredNumPartitions) throws Exception {
+
+		List<KafkaTopicPartition> mockFetchedPartitionsOnStartup = new ArrayList<>();
+		for (int i = 0; i < numPartitions; i++) {
+			mockFetchedPartitionsOnStartup.add(new KafkaTopicPartition("test-topic", i));
+		}
+
+		DummyFlinkKafkaConsumer<String>[] consumers =
+			new DummyFlinkKafkaConsumer[initialParallelism];
+
+		AbstractStreamOperatorTestHarness<String>[] testHarnesses=
+			new AbstractStreamOperatorTestHarness[initialParallelism];
+
+
+		for (int i = 0; i < initialParallelism; i++) {
+			consumers[i] = new DummyFlinkKafkaConsumer<>(
+				mockFetchedPartitionsOnStartup);
+			testHarnesses[i] = createTestHarness(consumers[i], initialParallelism, i);
+
+			// initializeState() is always called, null signals that we didn't restore
+			testHarnesses[i].initializeState(null);
+			testHarnesses[i].open();
+		}
+
+		Map<KafkaTopicPartition, Long> globalSubscribedPartitions = new HashMap<>();
+
+		for (int i = 0; i < initialParallelism; i++) {
+			Map<KafkaTopicPartition, Long> subscribedPartitions =
+				consumers[i].getSubscribedPartitionsToStartOffsets();
+
+			// make sure that no one else is subscribed to these partitions
+			for (KafkaTopicPartition partition : subscribedPartitions.keySet()) {
+				assertThat(globalSubscribedPartitions, not(hasKey(partition)));
+			}
+			globalSubscribedPartitions.putAll(subscribedPartitions);
+		}
+
+
+		assertThat(globalSubscribedPartitions.values(), hasSize(numPartitions));
+		assertThat(mockFetchedPartitionsOnStartup, everyItem(isIn(globalSubscribedPartitions.keySet())));
+
+		OperatorStateHandles[] state = new OperatorStateHandles[initialParallelism];
+
+		for (int i = 0; i < initialParallelism; i++) {
+			state[i] = testHarnesses[i].snapshot(0, 0);
+		}
+
+		OperatorStateHandles mergedState = AbstractStreamOperatorTestHarness.repackageState(state);
+
+		// -----------------------------------------------------------------------------------------
+		// restore
+
+		List<KafkaTopicPartition> mockFetchedPartitionsAfterRestore = new ArrayList<>();
+		for (int i = 0; i < restoredNumPartitions; i++) {
+			mockFetchedPartitionsAfterRestore.add(new KafkaTopicPartition("test-topic", i));
+		}
+
+		DummyFlinkKafkaConsumer<String>[] restoredConsumers =
+			new DummyFlinkKafkaConsumer[restoredParallelism];
+
+		AbstractStreamOperatorTestHarness<String>[] restoredTestHarnesses =
+			new AbstractStreamOperatorTestHarness[restoredParallelism];
+
+
+		for (int i = 0; i < restoredParallelism; i++) {
+			restoredConsumers[i] = new DummyFlinkKafkaConsumer<>(
+				mockFetchedPartitionsAfterRestore);
+			restoredTestHarnesses[i] = createTestHarness(restoredConsumers[i], restoredParallelism, i);
+
+			// initializeState() is always called, null signals that we didn't restore
+			restoredTestHarnesses[i].initializeState(mergedState);
+			restoredTestHarnesses[i].open();
+		}
+
+		Map<KafkaTopicPartition, Long> restoredGlobalSubscribedPartitions = new HashMap<>();
+
+		for (int i = 0; i < restoredParallelism; i++) {
+			Map<KafkaTopicPartition, Long> subscribedPartitions =
+				restoredConsumers[i].getSubscribedPartitionsToStartOffsets();
+
+			// make sure that no one else is subscribed to these partitions
+			for (KafkaTopicPartition partition : subscribedPartitions.keySet()) {
+				assertThat(restoredGlobalSubscribedPartitions, not(hasKey(partition)));
+			}
+			restoredGlobalSubscribedPartitions.putAll(subscribedPartitions);
+		}
+
+		assertThat(restoredGlobalSubscribedPartitions.values(), hasSize(numPartitions));
+		assertThat(mockFetchedPartitionsOnStartup, everyItem(isIn(restoredGlobalSubscribedPartitions.keySet())));
+	}
+
 	// ------------------------------------------------------------------------
 
 	private static <T> FlinkKafkaConsumerBase<T> getConsumer(
@@ -557,6 +684,19 @@ public class FlinkKafkaConsumerBaseTest {
 		return consumer;
 	}
 
+	private static <T> AbstractStreamOperatorTestHarness<T> createTestHarness(
+		SourceFunction<T> source, int numSubtasks, int subtaskIndex) throws Exception {
+
+		AbstractStreamOperatorTestHarness<T> testHarness =
+			new AbstractStreamOperatorTestHarness<>(
+				new StreamSource<>(source), Short.MAX_VALUE / 2, numSubtasks, subtaskIndex);
+
+		testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
+
+		return testHarness;
+	}
+
+
 	// ------------------------------------------------------------------------
 
 	private static class DummyFlinkKafkaConsumer<T> extends FlinkKafkaConsumerBase<T> {
@@ -564,26 +704,33 @@ public class FlinkKafkaConsumerBaseTest {
 
 		boolean isAutoCommitEnabled = false;
 
-		@SuppressWarnings("unchecked")
+		private List<KafkaTopicPartition> mockFetchedPartitions;
+
 		public DummyFlinkKafkaConsumer() {
+			this(Collections.<KafkaTopicPartition>emptyList());
+		}
+
+		@SuppressWarnings("unchecked")
+		public DummyFlinkKafkaConsumer(List<KafkaTopicPartition> mockFetchedPartitions) {
 			super(Arrays.asList("dummy-topic"), (KeyedDeserializationSchema < T >) mock(KeyedDeserializationSchema.class));
+			this.mockFetchedPartitions = Preconditions.checkNotNull(mockFetchedPartitions);
 		}
 
 		@Override
 		@SuppressWarnings("unchecked")
 		protected AbstractFetcher<T, ?> createFetcher(
-				SourceContext<T> sourceContext,
-				Map<KafkaTopicPartition, Long> thisSubtaskPartitionsWithStartOffsets,
-				SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
-				SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
-				StreamingRuntimeContext runtimeContext,
-				OffsetCommitMode offsetCommitMode) throws Exception {
+			SourceContext<T> sourceContext,
+			Map<KafkaTopicPartition, Long> thisSubtaskPartitionsWithStartOffsets,
+			SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
+			SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
+			StreamingRuntimeContext runtimeContext,
+			OffsetCommitMode offsetCommitMode) throws Exception {
 			return mock(AbstractFetcher.class);
 		}
 
 		@Override
 		protected List<KafkaTopicPartition> getKafkaPartitions(List<String> topics) {
-			return Collections.emptyList();
+			return mockFetchedPartitions;
 		}
 
 		@Override
@@ -595,7 +742,6 @@ public class FlinkKafkaConsumerBaseTest {
 			this.isAutoCommitEnabled = isAutoCommitEnabled;
 		}
 	}
-
 	private static final class TestingListState<T> implements ListState<T> {
 
 		private final List<T> list = new ArrayList<>();