You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/10/26 21:36:52 UTC
[6/9] flink git commit: [FLINK-4892] Add Key-Group Ranges Test in
HeapInternalTimerServiceTest
[FLINK-4892] Add Key-Group Ranges Test in HeapInternalTimerServiceTest
This checks whether key groups are correctly checkpointed and wether we
can correctly restore reassigned key-group ranges.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9dc26355
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9dc26355
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9dc26355
Branch: refs/heads/master
Commit: 9dc26355d2bfebcee05ba1185debfd5706d4f7a9
Parents: e3b5d33
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Oct 24 14:40:34 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Oct 26 23:26:28 2016 +0200
----------------------------------------------------------------------
.../operators/HeapInternalTimerServiceTest.java | 113 +++++++++++++++++++
1 file changed, 113 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9dc26355/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java
index bba6517..d753e4e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java
@@ -608,6 +608,119 @@ public class HeapInternalTimerServiceTest {
assertEquals(0, timerService.numEventTimeTimers());
}
+ /**
+ * This test checks whether timers are assigned to correct key groups
+ * and whether snapshot/restore respects key groups.
+ */
+ @Test
+ public void testSnapshotAndRebalancingRestore() throws Exception {
+ @SuppressWarnings("unchecked")
+ Triggerable<Integer, String> mockTriggerable = mock(Triggerable.class);
+
+ TestKeyContext keyContext = new TestKeyContext();
+ TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
+ HeapInternalTimerService<Integer, String> timerService =
+ createTimerService(mockTriggerable, keyContext, processingTimeService, testKeyGroupRange, maxParallelism);
+
+ int midpoint = testKeyGroupRange.getStartKeyGroup() +
+ (testKeyGroupRange.getEndKeyGroup() - testKeyGroupRange.getStartKeyGroup()) / 2;
+
+ // get two sub key-ranges so that we can restore two ranges separately
+ KeyGroupRange subKeyGroupRange1 = new KeyGroupRange(testKeyGroupRange.getStartKeyGroup(), midpoint);
+ KeyGroupRange subKeyGroupRange2 = new KeyGroupRange(midpoint + 1, testKeyGroupRange.getEndKeyGroup());
+
+ // get two different keys, one per sub range
+ int key1 = getKeyInKeyGroupRange(subKeyGroupRange1, maxParallelism);
+ int key2 = getKeyInKeyGroupRange(subKeyGroupRange2, maxParallelism);
+
+ keyContext.setCurrentKey(key1);
+
+ timerService.registerProcessingTimeTimer("ciao", 10);
+ timerService.registerEventTimeTimer("hello", 10);
+
+ keyContext.setCurrentKey(key2);
+
+ timerService.registerEventTimeTimer("ciao", 10);
+ timerService.registerProcessingTimeTimer("hello", 10);
+
+ assertEquals(2, timerService.numProcessingTimeTimers());
+ assertEquals(1, timerService.numProcessingTimeTimers("hello"));
+ assertEquals(1, timerService.numProcessingTimeTimers("ciao"));
+ assertEquals(2, timerService.numEventTimeTimers());
+ assertEquals(1, timerService.numEventTimeTimers("hello"));
+ assertEquals(1, timerService.numEventTimeTimers("ciao"));
+
+ // one map per sub key-group range
+ Map<Integer, byte[]> snapshot1 = new HashMap<>();
+ Map<Integer, byte[]> snapshot2 = new HashMap<>();
+ for (Integer keyGroupIndex : testKeyGroupRange) {
+ ByteArrayOutputStream outStream = new ByteArrayOutputStream();
+ timerService.snapshotTimersForKeyGroup(new DataOutputViewStreamWrapper(outStream), keyGroupIndex);
+ outStream.close();
+ if (subKeyGroupRange1.contains(keyGroupIndex)) {
+ snapshot1.put(keyGroupIndex, outStream.toByteArray());
+ } else if (subKeyGroupRange2.contains(keyGroupIndex)) {
+ snapshot2.put(keyGroupIndex, outStream.toByteArray());
+ } else {
+ throw new IllegalStateException("Key-Group index doesn't belong to any sub range.");
+ }
+ }
+
+ // from now on we need everything twice. once per sub key-group range
+ @SuppressWarnings("unchecked")
+ Triggerable<Integer, String> mockTriggerable1 = mock(Triggerable.class);
+
+ @SuppressWarnings("unchecked")
+ Triggerable<Integer, String> mockTriggerable2 = mock(Triggerable.class);
+
+
+ TestKeyContext keyContext1 = new TestKeyContext();
+ TestKeyContext keyContext2 = new TestKeyContext();
+
+ TestProcessingTimeService processingTimeService1 = new TestProcessingTimeService();
+ TestProcessingTimeService processingTimeService2 = new TestProcessingTimeService();
+
+ HeapInternalTimerService<Integer, String> timerService1 = restoreTimerService(
+ snapshot1,
+ mockTriggerable1,
+ keyContext1,
+ processingTimeService1,
+ subKeyGroupRange1,
+ maxParallelism);
+
+ HeapInternalTimerService<Integer, String> timerService2 = restoreTimerService(
+ snapshot2,
+ mockTriggerable2,
+ keyContext2,
+ processingTimeService2,
+ subKeyGroupRange2,
+ maxParallelism);
+
+
+ processingTimeService1.setCurrentTime(10);
+ timerService1.advanceWatermark(10);
+
+ verify(mockTriggerable1, times(1)).onProcessingTime(anyInternalTimer());
+ verify(mockTriggerable1, times(1)).onProcessingTime(eq(new InternalTimer<>(10, key1, "ciao")));
+ verify(mockTriggerable1, never()).onProcessingTime(eq(new InternalTimer<>(10, key2, "hello")));
+ verify(mockTriggerable1, times(1)).onEventTime(anyInternalTimer());
+ verify(mockTriggerable1, times(1)).onEventTime(eq(new InternalTimer<>(10, key1, "hello")));
+ verify(mockTriggerable1, never()).onEventTime(eq(new InternalTimer<>(10, key2, "ciao")));
+
+ assertEquals(0, timerService1.numEventTimeTimers());
+
+ processingTimeService2.setCurrentTime(10);
+ timerService2.advanceWatermark(10);
+
+ verify(mockTriggerable2, times(1)).onProcessingTime(anyInternalTimer());
+ verify(mockTriggerable2, never()).onProcessingTime(eq(new InternalTimer<>(10, key1, "ciao")));
+ verify(mockTriggerable2, times(1)).onProcessingTime(eq(new InternalTimer<>(10, key2, "hello")));
+ verify(mockTriggerable2, times(1)).onEventTime(anyInternalTimer());
+ verify(mockTriggerable2, never()).onEventTime(eq(new InternalTimer<>(10, key1, "hello")));
+ verify(mockTriggerable2, times(1)).onEventTime(eq(new InternalTimer<>(10, key2, "ciao")));
+
+ assertEquals(0, timerService2.numEventTimeTimers());
+ }
private static class TestKeyContext implements KeyContext {