You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/10/26 21:36:52 UTC

[6/9] flink git commit: [FLINK-4892] Add Key-Group Ranges Test in HeapInternalTimerServiceTest

[FLINK-4892] Add Key-Group Ranges Test in HeapInternalTimerServiceTest

This checks whether key groups are correctly checkpointed and wether we
can correctly restore reassigned key-group ranges.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9dc26355
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9dc26355
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9dc26355

Branch: refs/heads/master
Commit: 9dc26355d2bfebcee05ba1185debfd5706d4f7a9
Parents: e3b5d33
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Oct 24 14:40:34 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Oct 26 23:26:28 2016 +0200

----------------------------------------------------------------------
 .../operators/HeapInternalTimerServiceTest.java | 113 +++++++++++++++++++
 1 file changed, 113 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9dc26355/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java
index bba6517..d753e4e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java
@@ -608,6 +608,119 @@ public class HeapInternalTimerServiceTest {
 		assertEquals(0, timerService.numEventTimeTimers());
 	}
 
+	/**
+	 * This test checks whether timers are assigned to correct key groups
+	 * and whether snapshot/restore respects key groups.
+	 */
+	@Test
+	public void testSnapshotAndRebalancingRestore() throws Exception {
+		@SuppressWarnings("unchecked")
+		Triggerable<Integer, String> mockTriggerable = mock(Triggerable.class);
+
+		TestKeyContext keyContext = new TestKeyContext();
+		TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
+		HeapInternalTimerService<Integer, String> timerService =
+				createTimerService(mockTriggerable, keyContext, processingTimeService, testKeyGroupRange, maxParallelism);
+
+		int midpoint = testKeyGroupRange.getStartKeyGroup() +
+				(testKeyGroupRange.getEndKeyGroup() - testKeyGroupRange.getStartKeyGroup()) / 2;
+
+		// get two sub key-ranges so that we can restore two ranges separately
+		KeyGroupRange subKeyGroupRange1 = new KeyGroupRange(testKeyGroupRange.getStartKeyGroup(), midpoint);
+		KeyGroupRange subKeyGroupRange2 = new KeyGroupRange(midpoint + 1, testKeyGroupRange.getEndKeyGroup());
+
+		// get two different keys, one per sub range
+		int key1 = getKeyInKeyGroupRange(subKeyGroupRange1, maxParallelism);
+		int key2 = getKeyInKeyGroupRange(subKeyGroupRange2, maxParallelism);
+
+		keyContext.setCurrentKey(key1);
+
+		timerService.registerProcessingTimeTimer("ciao", 10);
+		timerService.registerEventTimeTimer("hello", 10);
+
+		keyContext.setCurrentKey(key2);
+
+		timerService.registerEventTimeTimer("ciao", 10);
+		timerService.registerProcessingTimeTimer("hello", 10);
+
+		assertEquals(2, timerService.numProcessingTimeTimers());
+		assertEquals(1, timerService.numProcessingTimeTimers("hello"));
+		assertEquals(1, timerService.numProcessingTimeTimers("ciao"));
+		assertEquals(2, timerService.numEventTimeTimers());
+		assertEquals(1, timerService.numEventTimeTimers("hello"));
+		assertEquals(1, timerService.numEventTimeTimers("ciao"));
+
+		// one map per sub key-group range
+		Map<Integer, byte[]> snapshot1 = new HashMap<>();
+		Map<Integer, byte[]> snapshot2 = new HashMap<>();
+		for (Integer keyGroupIndex : testKeyGroupRange) {
+			ByteArrayOutputStream outStream = new ByteArrayOutputStream();
+			timerService.snapshotTimersForKeyGroup(new DataOutputViewStreamWrapper(outStream), keyGroupIndex);
+			outStream.close();
+			if (subKeyGroupRange1.contains(keyGroupIndex)) {
+				snapshot1.put(keyGroupIndex, outStream.toByteArray());
+			} else if (subKeyGroupRange2.contains(keyGroupIndex)) {
+				snapshot2.put(keyGroupIndex, outStream.toByteArray());
+			} else {
+				throw new IllegalStateException("Key-Group index doesn't belong to any sub range.");
+			}
+		}
+
+		// from now on we need everything twice. once per sub key-group range
+		@SuppressWarnings("unchecked")
+		Triggerable<Integer, String> mockTriggerable1 = mock(Triggerable.class);
+
+		@SuppressWarnings("unchecked")
+		Triggerable<Integer, String> mockTriggerable2 = mock(Triggerable.class);
+
+
+		TestKeyContext keyContext1 = new TestKeyContext();
+		TestKeyContext keyContext2 = new TestKeyContext();
+
+		TestProcessingTimeService processingTimeService1 = new TestProcessingTimeService();
+		TestProcessingTimeService processingTimeService2 = new TestProcessingTimeService();
+
+		HeapInternalTimerService<Integer, String> timerService1 = restoreTimerService(
+				snapshot1,
+				mockTriggerable1,
+				keyContext1,
+				processingTimeService1,
+				subKeyGroupRange1,
+				maxParallelism);
+
+		HeapInternalTimerService<Integer, String> timerService2 = restoreTimerService(
+				snapshot2,
+				mockTriggerable2,
+				keyContext2,
+				processingTimeService2,
+				subKeyGroupRange2,
+				maxParallelism);
+
+
+		processingTimeService1.setCurrentTime(10);
+		timerService1.advanceWatermark(10);
+
+		verify(mockTriggerable1, times(1)).onProcessingTime(anyInternalTimer());
+		verify(mockTriggerable1, times(1)).onProcessingTime(eq(new InternalTimer<>(10, key1, "ciao")));
+		verify(mockTriggerable1, never()).onProcessingTime(eq(new InternalTimer<>(10, key2, "hello")));
+		verify(mockTriggerable1, times(1)).onEventTime(anyInternalTimer());
+		verify(mockTriggerable1, times(1)).onEventTime(eq(new InternalTimer<>(10, key1, "hello")));
+		verify(mockTriggerable1, never()).onEventTime(eq(new InternalTimer<>(10, key2, "ciao")));
+
+		assertEquals(0, timerService1.numEventTimeTimers());
+
+		processingTimeService2.setCurrentTime(10);
+		timerService2.advanceWatermark(10);
+
+		verify(mockTriggerable2, times(1)).onProcessingTime(anyInternalTimer());
+		verify(mockTriggerable2, never()).onProcessingTime(eq(new InternalTimer<>(10, key1, "ciao")));
+		verify(mockTriggerable2, times(1)).onProcessingTime(eq(new InternalTimer<>(10, key2, "hello")));
+		verify(mockTriggerable2, times(1)).onEventTime(anyInternalTimer());
+		verify(mockTriggerable2, never()).onEventTime(eq(new InternalTimer<>(10, key1, "hello")));
+		verify(mockTriggerable2, times(1)).onEventTime(eq(new InternalTimer<>(10, key2, "ciao")));
+
+		assertEquals(0, timerService2.numEventTimeTimers());
+	}
 
 	private static class TestKeyContext implements KeyContext {