You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/07/24 03:55:16 UTC

[3/4] flink git commit: [FLINK-7143] [kafka, tests] Stricter tests for deterministic partition assignment

[FLINK-7143] [kafka, tests] Stricter tests for deterministic partition assignment

Previous test coverage for partition assignment was not strict enough.
It did verify that partitions are assigned to the exact expected
subtasks, which is crucial for verifying that partition to subtask
assignments are deterministic, and insensitive to the ordering of the
fetched partition metadata.

This close #4302.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e16e9d99
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e16e9d99
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e16e9d99

Branch: refs/heads/master
Commit: e16e9d99fded74e2c92083e3734ba94eea927fb0
Parents: c65afdb
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Wed Jul 12 02:36:00 2017 +0900
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Mon Jul 24 10:28:04 2017 +0800

----------------------------------------------------------------------
 .../AbstractPartitionDiscovererTest.java        | 77 ++++++++++++++++++--
 1 file changed, 71 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e16e9d99/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscovererTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscovererTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscovererTest.java
index 71c4c1b..b00d74d 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscovererTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscovererTest.java
@@ -72,10 +72,10 @@ public class AbstractPartitionDiscovererTest {
 			new KafkaTopicPartition(TEST_TOPIC, 2),
 			new KafkaTopicPartition(TEST_TOPIC, 3));
 
-		for (int i = 0; i < mockGetAllPartitionsForTopicsReturn.size(); i++) {
+		for (int subtaskIndex = 0; subtaskIndex < mockGetAllPartitionsForTopicsReturn.size(); subtaskIndex++) {
 			TestPartitionDiscoverer partitionDiscoverer = new TestPartitionDiscoverer(
 					topicsDescriptor,
-					i,
+					subtaskIndex,
 					mockGetAllPartitionsForTopicsReturn.size(),
 					createMockGetAllTopicsSequenceFromFixedReturn(Collections.singletonList(TEST_TOPIC)),
 					createMockGetAllPartitionsFromTopicsSequenceFromFixedReturn(mockGetAllPartitionsForTopicsReturn));
@@ -84,6 +84,9 @@ public class AbstractPartitionDiscovererTest {
 			List<KafkaTopicPartition> initialDiscovery = partitionDiscoverer.discoverPartitions();
 			assertEquals(1, initialDiscovery.size());
 			assertTrue(contains(mockGetAllPartitionsForTopicsReturn, initialDiscovery.get(0).getPartition()));
+			assertEquals(
+				getExpectedSubtaskIndex(initialDiscovery.get(0), mockGetAllPartitionsForTopicsReturn.size()),
+				subtaskIndex);
 
 			// subsequent discoveries should not find anything
 			List<KafkaTopicPartition> secondDiscovery = partitionDiscoverer.discoverPartitions();
@@ -111,10 +114,10 @@ public class AbstractPartitionDiscovererTest {
 			final int minPartitionsPerConsumer = mockGetAllPartitionsForTopicsReturn.size() / numConsumers;
 			final int maxPartitionsPerConsumer = mockGetAllPartitionsForTopicsReturn.size() / numConsumers + 1;
 
-			for (int i = 0; i < numConsumers; i++) {
+			for (int subtaskIndex = 0; subtaskIndex < numConsumers; subtaskIndex++) {
 				TestPartitionDiscoverer partitionDiscoverer = new TestPartitionDiscoverer(
 						topicsDescriptor,
-						i,
+						subtaskIndex,
 						numConsumers,
 						createMockGetAllTopicsSequenceFromFixedReturn(Collections.singletonList(TEST_TOPIC)),
 						createMockGetAllPartitionsFromTopicsSequenceFromFixedReturn(mockGetAllPartitionsForTopicsReturn));
@@ -127,6 +130,7 @@ public class AbstractPartitionDiscovererTest {
 				for (KafkaTopicPartition p : initialDiscovery) {
 					// check that the element was actually contained
 					assertTrue(allPartitions.remove(p));
+					assertEquals(getExpectedSubtaskIndex(p, numConsumers), subtaskIndex);
 				}
 
 				// subsequent discoveries should not find anything
@@ -159,10 +163,10 @@ public class AbstractPartitionDiscovererTest {
 
 			final int numConsumers = 2 * mockGetAllPartitionsForTopicsReturn.size() + 3;
 
-			for (int i = 0; i < numConsumers; i++) {
+			for (int subtaskIndex = 0; subtaskIndex < numConsumers; subtaskIndex++) {
 				TestPartitionDiscoverer partitionDiscoverer = new TestPartitionDiscoverer(
 						topicsDescriptor,
-						i,
+						subtaskIndex,
 						numConsumers,
 						createMockGetAllTopicsSequenceFromFixedReturn(Collections.singletonList(TEST_TOPIC)),
 						createMockGetAllPartitionsFromTopicsSequenceFromFixedReturn(mockGetAllPartitionsForTopicsReturn));
@@ -174,6 +178,7 @@ public class AbstractPartitionDiscovererTest {
 				for (KafkaTopicPartition p : initialDiscovery) {
 					// check that the element was actually contained
 					assertTrue(allPartitions.remove(p));
+					assertEquals(getExpectedSubtaskIndex(p, numConsumers), subtaskIndex);
 				}
 
 				// subsequent discoveries should not find anything
@@ -255,16 +260,19 @@ public class AbstractPartitionDiscovererTest {
 			for (KafkaTopicPartition p : initialDiscoverySubtask0) {
 				// check that the element was actually contained
 				assertTrue(allInitialPartitions.remove(p));
+				assertEquals(getExpectedSubtaskIndex(p, numConsumers), 0);
 			}
 
 			for (KafkaTopicPartition p : initialDiscoverySubtask1) {
 				// check that the element was actually contained
 				assertTrue(allInitialPartitions.remove(p));
+				assertEquals(getExpectedSubtaskIndex(p, numConsumers), 1);
 			}
 
 			for (KafkaTopicPartition p : initialDiscoverySubtask2) {
 				// check that the element was actually contained
 				assertTrue(allInitialPartitions.remove(p));
+				assertEquals(getExpectedSubtaskIndex(p, numConsumers), 2);
 			}
 
 			// all partitions must have been assigned
@@ -291,26 +299,32 @@ public class AbstractPartitionDiscovererTest {
 
 			for (KafkaTopicPartition p : initialDiscoverySubtask0) {
 				assertTrue(allNewPartitions.remove(p));
+				assertEquals(getExpectedSubtaskIndex(p, numConsumers), 0);
 			}
 
 			for (KafkaTopicPartition p : initialDiscoverySubtask1) {
 				assertTrue(allNewPartitions.remove(p));
+				assertEquals(getExpectedSubtaskIndex(p, numConsumers), 1);
 			}
 
 			for (KafkaTopicPartition p : initialDiscoverySubtask2) {
 				assertTrue(allNewPartitions.remove(p));
+				assertEquals(getExpectedSubtaskIndex(p, numConsumers), 2);
 			}
 
 			for (KafkaTopicPartition p : secondDiscoverySubtask0) {
 				assertTrue(allNewPartitions.remove(p));
+				assertEquals(getExpectedSubtaskIndex(p, numConsumers), 0);
 			}
 
 			for (KafkaTopicPartition p : secondDiscoverySubtask1) {
 				assertTrue(allNewPartitions.remove(p));
+				assertEquals(getExpectedSubtaskIndex(p, numConsumers), 1);
 			}
 
 			for (KafkaTopicPartition p : secondDiscoverySubtask2) {
 				assertTrue(allNewPartitions.remove(p));
+				assertEquals(getExpectedSubtaskIndex(p, numConsumers), 2);
 			}
 
 			// all partitions must have been assigned
@@ -322,6 +336,53 @@ public class AbstractPartitionDiscovererTest {
 		}
 	}
 
+	@Test
+	public void testDeterministicAssignmentWithDifferentFetchedPartitionOrdering() throws Exception {
+		int numSubtasks = 4;
+
+		List<KafkaTopicPartition> mockGetAllPartitionsForTopicsReturn = Arrays.asList(
+			new KafkaTopicPartition("test-topic", 0),
+			new KafkaTopicPartition("test-topic", 1),
+			new KafkaTopicPartition("test-topic", 2),
+			new KafkaTopicPartition("test-topic", 3),
+			new KafkaTopicPartition("test-topic2", 0),
+			new KafkaTopicPartition("test-topic2", 1));
+
+		List<KafkaTopicPartition> mockGetAllPartitionsForTopicsReturnOutOfOrder = Arrays.asList(
+			new KafkaTopicPartition("test-topic", 3),
+			new KafkaTopicPartition("test-topic", 1),
+			new KafkaTopicPartition("test-topic2", 1),
+			new KafkaTopicPartition("test-topic", 0),
+			new KafkaTopicPartition("test-topic2", 0),
+			new KafkaTopicPartition("test-topic", 2));
+
+		for (int subtaskIndex = 0; subtaskIndex < numSubtasks; subtaskIndex++) {
+			TestPartitionDiscoverer partitionDiscoverer = new TestPartitionDiscoverer(
+					topicsDescriptor,
+					subtaskIndex,
+					numSubtasks,
+					createMockGetAllTopicsSequenceFromFixedReturn(Arrays.asList("test-topic", "test-topic2")),
+					createMockGetAllPartitionsFromTopicsSequenceFromFixedReturn(mockGetAllPartitionsForTopicsReturn));
+			partitionDiscoverer.open();
+
+			TestPartitionDiscoverer partitionDiscovererOutOfOrder = new TestPartitionDiscoverer(
+					topicsDescriptor,
+					subtaskIndex,
+					numSubtasks,
+					createMockGetAllTopicsSequenceFromFixedReturn(Arrays.asList("test-topic", "test-topic2")),
+					createMockGetAllPartitionsFromTopicsSequenceFromFixedReturn(mockGetAllPartitionsForTopicsReturn));
+			partitionDiscovererOutOfOrder.open();
+
+			List<KafkaTopicPartition> discoveredPartitions = partitionDiscoverer.discoverPartitions();
+			List<KafkaTopicPartition> discoveredPartitionsOutOfOrder = partitionDiscovererOutOfOrder.discoverPartitions();
+
+			// the subscribed partitions should be identical, regardless of the input partition ordering
+			Collections.sort(discoveredPartitions, new KafkaTopicPartition.Comparator());
+			Collections.sort(discoveredPartitionsOutOfOrder, new KafkaTopicPartition.Comparator());
+			assertEquals(discoveredPartitions, discoveredPartitionsOutOfOrder);
+		}
+	}
+
 	private static class TestPartitionDiscoverer extends AbstractPartitionDiscoverer {
 
 		private final KafkaTopicsDescriptor topicsDescriptor;
@@ -425,4 +486,8 @@ public class AbstractPartitionDiscovererTest {
 
 		return clone;
 	}
+
+	private static int getExpectedSubtaskIndex(KafkaTopicPartition partition, int numTasks) {
+		return Math.abs(partition.hashCode() % numTasks);
+	}
 }