You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/10/26 21:36:54 UTC

[8/9] flink git commit: [FLINK-4892] Parameterize HeapInternalTimerServiceTest

[FLINK-4892] Parameterize HeapInternalTimerServiceTest

This now tests multiple interesting key-group cases.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e3b5d332
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e3b5d332
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e3b5d332

Branch: refs/heads/master
Commit: e3b5d33237430acb65d1a93531448825a76c7ce5
Parents: b673760
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Oct 24 14:39:22 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Oct 26 23:26:28 2016 +0200

----------------------------------------------------------------------
 .../operators/HeapInternalTimerServiceTest.java | 243 +++++++++++++------
 1 file changed, 169 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e3b5d332/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java
index 09499c2..bba6517 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java
@@ -28,13 +28,19 @@ import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import org.junit.Assert;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
-import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Map;
+import java.util.Random;
 import java.util.Set;
 
 import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -45,17 +51,21 @@ import static org.mockito.Mockito.*;
 /**
  * Tests for {@link HeapInternalTimerService}.
  */
+@RunWith(Parameterized.class)
 public class HeapInternalTimerServiceTest {
 
-	private static final int startKeyGroupIdx = 0;
-	private static final int endKeyGroupIdx = 10;
-	private static final KeyGroupsList testKeyGroupList =
-		new KeyGroupRange(startKeyGroupIdx, endKeyGroupIdx);
+	private final int maxParallelism;
+	private final KeyGroupRange testKeyGroupRange;
 
 	private static InternalTimer<Integer, String> anyInternalTimer() {
 		return any();
 	}
 
+	public HeapInternalTimerServiceTest(int startKeyGroup, int endKeyGroup, int maxParallelism) {
+		this.testKeyGroupRange = new KeyGroupRange(startKeyGroup, endKeyGroup);
+		this.maxParallelism = maxParallelism;
+	}
+
 	@Test
 	public void testKeyGroupStartIndexSetting() {
 
@@ -151,9 +161,10 @@ public class HeapInternalTimerServiceTest {
 		TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
 
 		HeapInternalTimerService<Integer, String> timerService =
-				createTimerService(mockTriggerable, keyContext, processingTimeService);
+				createTimerService(mockTriggerable, keyContext, processingTimeService, testKeyGroupRange, maxParallelism);
 
-		keyContext.setCurrentKey(0);
+		int key = getKeyInKeyGroupRange(testKeyGroupRange, maxParallelism);
+		keyContext.setCurrentKey(key);
 
 		timerService.registerProcessingTimeTimer("ciao", 10);
 		timerService.registerProcessingTimeTimer("ciao", 20);
@@ -212,9 +223,11 @@ public class HeapInternalTimerServiceTest {
 		TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
 
 		HeapInternalTimerService<Integer, String> timerService =
-				createTimerService(mockTriggerable, keyContext, processingTimeService);
+				createTimerService(mockTriggerable, keyContext, processingTimeService, testKeyGroupRange, maxParallelism);
 
-		keyContext.setCurrentKey(0);
+		int key = getKeyInKeyGroupRange(testKeyGroupRange, maxParallelism);
+
+		keyContext.setCurrentKey(key);
 
 		timerService.registerProcessingTimeTimer("ciao", 20);
 
@@ -243,9 +256,11 @@ public class HeapInternalTimerServiceTest {
 		TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
 
 		final HeapInternalTimerService<Integer, String> timerService =
-				createTimerService(mockTriggerable, keyContext, processingTimeService);
+				createTimerService(mockTriggerable, keyContext, processingTimeService, testKeyGroupRange, maxParallelism);
+
+		int key = getKeyInKeyGroupRange(testKeyGroupRange, maxParallelism);
 
-		keyContext.setCurrentKey(0);
+		keyContext.setCurrentKey(key);
 
 		timerService.registerProcessingTimeTimer("ciao", 10);
 
@@ -293,7 +308,7 @@ public class HeapInternalTimerServiceTest {
 		TestKeyContext keyContext = new TestKeyContext();
 		TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
 		HeapInternalTimerService<Integer, String> timerService =
-				createTimerService(mockTriggerable, keyContext, processingTimeService);
+				createTimerService(mockTriggerable, keyContext, processingTimeService, testKeyGroupRange, maxParallelism);
 
 		processingTimeService.setCurrentTime(17L);
 		assertEquals(17, timerService.currentProcessingTime());
@@ -311,7 +326,7 @@ public class HeapInternalTimerServiceTest {
 		TestKeyContext keyContext = new TestKeyContext();
 		TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
 		HeapInternalTimerService<Integer, String> timerService =
-				createTimerService(mockTriggerable, keyContext, processingTimeService);
+				createTimerService(mockTriggerable, keyContext, processingTimeService, testKeyGroupRange, maxParallelism);
 
 		timerService.advanceWatermark(17);
 		assertEquals(17, timerService.currentWatermark());
@@ -331,14 +346,21 @@ public class HeapInternalTimerServiceTest {
 		TestKeyContext keyContext = new TestKeyContext();
 		TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
 		HeapInternalTimerService<Integer, String> timerService =
-				createTimerService(mockTriggerable, keyContext, processingTimeService);
+				createTimerService(mockTriggerable, keyContext, processingTimeService, testKeyGroupRange, maxParallelism);
 
-		keyContext.setCurrentKey(0);
+		// get two different keys
+		int key1 = getKeyInKeyGroupRange(testKeyGroupRange, maxParallelism);
+		int key2 = getKeyInKeyGroupRange(testKeyGroupRange, maxParallelism);
+		while (key2 == key1) {
+			key2 = getKeyInKeyGroupRange(testKeyGroupRange, maxParallelism);
+		}
+
+		keyContext.setCurrentKey(key1);
 
 		timerService.registerEventTimeTimer("ciao", 10);
 		timerService.registerEventTimeTimer("hello", 10);
 
-		keyContext.setCurrentKey(1);
+		keyContext.setCurrentKey(key2);
 
 		timerService.registerEventTimeTimer("ciao", 10);
 		timerService.registerEventTimeTimer("hello", 10);
@@ -350,10 +372,10 @@ public class HeapInternalTimerServiceTest {
 		timerService.advanceWatermark(10);
 
 		verify(mockTriggerable, times(4)).onEventTime(anyInternalTimer());
-		verify(mockTriggerable, times(1)).onEventTime(eq(new InternalTimer<>(10, 0, "ciao")));
-		verify(mockTriggerable, times(1)).onEventTime(eq(new InternalTimer<>(10, 0, "hello")));
-		verify(mockTriggerable, times(1)).onEventTime(eq(new InternalTimer<>(10, 1, "ciao")));
-		verify(mockTriggerable, times(1)).onEventTime(eq(new InternalTimer<>(10, 1, "hello")));
+		verify(mockTriggerable, times(1)).onEventTime(eq(new InternalTimer<>(10, key1, "ciao")));
+		verify(mockTriggerable, times(1)).onEventTime(eq(new InternalTimer<>(10, key1, "hello")));
+		verify(mockTriggerable, times(1)).onEventTime(eq(new InternalTimer<>(10, key2, "ciao")));
+		verify(mockTriggerable, times(1)).onEventTime(eq(new InternalTimer<>(10, key2, "hello")));
 
 		assertEquals(0, timerService.numEventTimeTimers());
 	}
@@ -369,14 +391,21 @@ public class HeapInternalTimerServiceTest {
 		TestKeyContext keyContext = new TestKeyContext();
 		TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
 		HeapInternalTimerService<Integer, String> timerService =
-				createTimerService(mockTriggerable, keyContext, processingTimeService);
+				createTimerService(mockTriggerable, keyContext, processingTimeService, testKeyGroupRange, maxParallelism);
 
-		keyContext.setCurrentKey(0);
+		// get two different keys
+		int key1 = getKeyInKeyGroupRange(testKeyGroupRange, maxParallelism);
+		int key2 = getKeyInKeyGroupRange(testKeyGroupRange, maxParallelism);
+		while (key2 == key1) {
+			key2 = getKeyInKeyGroupRange(testKeyGroupRange, maxParallelism);
+		}
+
+		keyContext.setCurrentKey(key1);
 
 		timerService.registerProcessingTimeTimer("ciao", 10);
 		timerService.registerProcessingTimeTimer("hello", 10);
 
-		keyContext.setCurrentKey(1);
+		keyContext.setCurrentKey(key2);
 
 		timerService.registerProcessingTimeTimer("ciao", 10);
 		timerService.registerProcessingTimeTimer("hello", 10);
@@ -388,10 +417,10 @@ public class HeapInternalTimerServiceTest {
 		processingTimeService.setCurrentTime(10);
 
 		verify(mockTriggerable, times(4)).onProcessingTime(anyInternalTimer());
-		verify(mockTriggerable, times(1)).onProcessingTime(eq(new InternalTimer<>(10, 0, "ciao")));
-		verify(mockTriggerable, times(1)).onProcessingTime(eq(new InternalTimer<>(10, 0, "hello")));
-		verify(mockTriggerable, times(1)).onProcessingTime(eq(new InternalTimer<>(10, 1, "ciao")));
-		verify(mockTriggerable, times(1)).onProcessingTime(eq(new InternalTimer<>(10, 1, "hello")));
+		verify(mockTriggerable, times(1)).onProcessingTime(eq(new InternalTimer<>(10, key1, "ciao")));
+		verify(mockTriggerable, times(1)).onProcessingTime(eq(new InternalTimer<>(10, key1, "hello")));
+		verify(mockTriggerable, times(1)).onProcessingTime(eq(new InternalTimer<>(10, key2, "ciao")));
+		verify(mockTriggerable, times(1)).onProcessingTime(eq(new InternalTimer<>(10, key2, "hello")));
 
 		assertEquals(0, timerService.numProcessingTimeTimers());
 	}
@@ -409,14 +438,21 @@ public class HeapInternalTimerServiceTest {
 		TestKeyContext keyContext = new TestKeyContext();
 		TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
 		HeapInternalTimerService<Integer, String> timerService =
-				createTimerService(mockTriggerable, keyContext, processingTimeService);
+				createTimerService(mockTriggerable, keyContext, processingTimeService, testKeyGroupRange, maxParallelism);
+
+		// get two different keys
+		int key1 = getKeyInKeyGroupRange(testKeyGroupRange, maxParallelism);
+		int key2 = getKeyInKeyGroupRange(testKeyGroupRange, maxParallelism);
+		while (key2 == key1) {
+			key2 = getKeyInKeyGroupRange(testKeyGroupRange, maxParallelism);
+		}
 
-		keyContext.setCurrentKey(0);
+		keyContext.setCurrentKey(key1);
 
 		timerService.registerEventTimeTimer("ciao", 10);
 		timerService.registerEventTimeTimer("hello", 10);
 
-		keyContext.setCurrentKey(1);
+		keyContext.setCurrentKey(key2);
 
 		timerService.registerEventTimeTimer("ciao", 10);
 		timerService.registerEventTimeTimer("hello", 10);
@@ -425,10 +461,10 @@ public class HeapInternalTimerServiceTest {
 		assertEquals(2, timerService.numEventTimeTimers("hello"));
 		assertEquals(2, timerService.numEventTimeTimers("ciao"));
 
-		keyContext.setCurrentKey(0);
+		keyContext.setCurrentKey(key1);
 		timerService.deleteEventTimeTimer("hello", 10);
 
-		keyContext.setCurrentKey(1);
+		keyContext.setCurrentKey(key2);
 		timerService.deleteEventTimeTimer("ciao", 10);
 
 		assertEquals(2, timerService.numEventTimeTimers());
@@ -438,10 +474,10 @@ public class HeapInternalTimerServiceTest {
 		timerService.advanceWatermark(10);
 
 		verify(mockTriggerable, times(2)).onEventTime(anyInternalTimer());
-		verify(mockTriggerable, times(1)).onEventTime(eq(new InternalTimer<>(10, 0, "ciao")));
-		verify(mockTriggerable, times(0)).onEventTime(eq(new InternalTimer<>(10, 0, "hello")));
-		verify(mockTriggerable, times(0)).onEventTime(eq(new InternalTimer<>(10, 1, "ciao")));
-		verify(mockTriggerable, times(1)).onEventTime(eq(new InternalTimer<>(10, 1, "hello")));
+		verify(mockTriggerable, times(1)).onEventTime(eq(new InternalTimer<>(10, key1, "ciao")));
+		verify(mockTriggerable, times(0)).onEventTime(eq(new InternalTimer<>(10, key1, "hello")));
+		verify(mockTriggerable, times(0)).onEventTime(eq(new InternalTimer<>(10, key2, "ciao")));
+		verify(mockTriggerable, times(1)).onEventTime(eq(new InternalTimer<>(10, key2, "hello")));
 
 		assertEquals(0, timerService.numEventTimeTimers());
 	}
@@ -459,14 +495,21 @@ public class HeapInternalTimerServiceTest {
 		TestKeyContext keyContext = new TestKeyContext();
 		TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
 		HeapInternalTimerService<Integer, String> timerService =
-				createTimerService(mockTriggerable, keyContext, processingTimeService);
+				createTimerService(mockTriggerable, keyContext, processingTimeService, testKeyGroupRange, maxParallelism);
 
-		keyContext.setCurrentKey(0);
+		// get two different keys
+		int key1 = getKeyInKeyGroupRange(testKeyGroupRange, maxParallelism);
+		int key2 = getKeyInKeyGroupRange(testKeyGroupRange, maxParallelism);
+		while (key2 == key1) {
+			key2 = getKeyInKeyGroupRange(testKeyGroupRange, maxParallelism);
+		}
+
+		keyContext.setCurrentKey(key1);
 
 		timerService.registerProcessingTimeTimer("ciao", 10);
 		timerService.registerProcessingTimeTimer("hello", 10);
 
-		keyContext.setCurrentKey(1);
+		keyContext.setCurrentKey(key2);
 
 		timerService.registerProcessingTimeTimer("ciao", 10);
 		timerService.registerProcessingTimeTimer("hello", 10);
@@ -475,10 +518,10 @@ public class HeapInternalTimerServiceTest {
 		assertEquals(2, timerService.numProcessingTimeTimers("hello"));
 		assertEquals(2, timerService.numProcessingTimeTimers("ciao"));
 
-		keyContext.setCurrentKey(0);
+		keyContext.setCurrentKey(key1);
 		timerService.deleteProcessingTimeTimer("hello", 10);
 
-		keyContext.setCurrentKey(1);
+		keyContext.setCurrentKey(key2);
 		timerService.deleteProcessingTimeTimer("ciao", 10);
 
 		assertEquals(2, timerService.numProcessingTimeTimers());
@@ -488,10 +531,10 @@ public class HeapInternalTimerServiceTest {
 		processingTimeService.setCurrentTime(10);
 
 		verify(mockTriggerable, times(2)).onProcessingTime(anyInternalTimer());
-		verify(mockTriggerable, times(1)).onProcessingTime(eq(new InternalTimer<>(10, 0, "ciao")));
-		verify(mockTriggerable, times(0)).onProcessingTime(eq(new InternalTimer<>(10, 0, "hello")));
-		verify(mockTriggerable, times(0)).onProcessingTime(eq(new InternalTimer<>(10, 1, "ciao")));
-		verify(mockTriggerable, times(1)).onProcessingTime(eq(new InternalTimer<>(10, 1, "hello")));
+		verify(mockTriggerable, times(1)).onProcessingTime(eq(new InternalTimer<>(10, key1, "ciao")));
+		verify(mockTriggerable, times(0)).onProcessingTime(eq(new InternalTimer<>(10, key1, "hello")));
+		verify(mockTriggerable, times(0)).onProcessingTime(eq(new InternalTimer<>(10, key2, "ciao")));
+		verify(mockTriggerable, times(1)).onProcessingTime(eq(new InternalTimer<>(10, key2, "hello")));
 
 		assertEquals(0, timerService.numEventTimeTimers());
 	}
@@ -504,14 +547,21 @@ public class HeapInternalTimerServiceTest {
 		TestKeyContext keyContext = new TestKeyContext();
 		TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
 		HeapInternalTimerService<Integer, String> timerService =
-				createTimerService(mockTriggerable, keyContext, processingTimeService);
+				createTimerService(mockTriggerable, keyContext, processingTimeService, testKeyGroupRange, maxParallelism);
 
-		keyContext.setCurrentKey(0);
+		// get two different keys
+		int key1 = getKeyInKeyGroupRange(testKeyGroupRange, maxParallelism);
+		int key2 = getKeyInKeyGroupRange(testKeyGroupRange, maxParallelism);
+		while (key2 == key1) {
+			key2 = getKeyInKeyGroupRange(testKeyGroupRange, maxParallelism);
+		}
+
+		keyContext.setCurrentKey(key1);
 
 		timerService.registerProcessingTimeTimer("ciao", 10);
 		timerService.registerEventTimeTimer("hello", 10);
 
-		keyContext.setCurrentKey(1);
+		keyContext.setCurrentKey(key2);
 
 		timerService.registerEventTimeTimer("ciao", 10);
 		timerService.registerProcessingTimeTimer("hello", 10);
@@ -523,11 +573,13 @@ public class HeapInternalTimerServiceTest {
 		assertEquals(1, timerService.numEventTimeTimers("hello"));
 		assertEquals(1, timerService.numEventTimeTimers("ciao"));
 
-		ByteArrayOutputStream outStream = new ByteArrayOutputStream();
-		for (int keyGroupIdx = startKeyGroupIdx; keyGroupIdx < endKeyGroupIdx; keyGroupIdx++) {
-			timerService.snapshotTimersForKeyGroup(new DataOutputViewStreamWrapper(outStream), keyGroupIdx);
+		Map<Integer, byte[]> snapshot = new HashMap<>();
+		for (Integer keyGroupIndex : testKeyGroupRange) {
+			ByteArrayOutputStream outStream = new ByteArrayOutputStream();
+			timerService.snapshotTimersForKeyGroup(new DataOutputViewStreamWrapper(outStream), keyGroupIndex);
+			outStream.close();
+			snapshot.put(keyGroupIndex, outStream.toByteArray());
 		}
-		outStream.close();
 
 		@SuppressWarnings("unchecked")
 		Triggerable<Integer, String> mockTriggerable2 = mock(Triggerable.class);
@@ -536,20 +588,22 @@ public class HeapInternalTimerServiceTest {
 		processingTimeService = new TestProcessingTimeService();
 
 		timerService = restoreTimerService(
-				new ByteArrayInputStream(outStream.toByteArray()),
+				snapshot,
 				mockTriggerable2,
 				keyContext,
-				processingTimeService);
+				processingTimeService,
+				testKeyGroupRange,
+				maxParallelism);
 
 		processingTimeService.setCurrentTime(10);
 		timerService.advanceWatermark(10);
 
 		verify(mockTriggerable2, times(2)).onProcessingTime(anyInternalTimer());
-		verify(mockTriggerable2, times(1)).onProcessingTime(eq(new InternalTimer<>(10, 0, "ciao")));
-		verify(mockTriggerable2, times(1)).onProcessingTime(eq(new InternalTimer<>(10, 1, "hello")));
+		verify(mockTriggerable2, times(1)).onProcessingTime(eq(new InternalTimer<>(10, key1, "ciao")));
+		verify(mockTriggerable2, times(1)).onProcessingTime(eq(new InternalTimer<>(10, key2, "hello")));
 		verify(mockTriggerable2, times(2)).onEventTime(anyInternalTimer());
-		verify(mockTriggerable2, times(1)).onEventTime(eq(new InternalTimer<>(10, 0, "hello")));
-		verify(mockTriggerable2, times(1)).onEventTime(eq(new InternalTimer<>(10, 1, "ciao")));
+		verify(mockTriggerable2, times(1)).onEventTime(eq(new InternalTimer<>(10, key1, "hello")));
+		verify(mockTriggerable2, times(1)).onEventTime(eq(new InternalTimer<>(10, key2, "ciao")));
 
 		assertEquals(0, timerService.numEventTimeTimers());
 	}
@@ -570,45 +624,86 @@ public class HeapInternalTimerServiceTest {
 		}
 	}
 
+	private static int getKeyInKeyGroup(int keyGroup, int maxParallelism) {
+		Random rand = new Random(System.currentTimeMillis());
+		int result = rand.nextInt();
+		while (KeyGroupRangeAssignment.assignToKeyGroup(result, maxParallelism) != keyGroup) {
+			result = rand.nextInt();
+		}
+		return result;
+	}
+
+	private static int getKeyInKeyGroupRange(KeyGroupRange range, int maxParallelism) {
+		Random rand = new Random(System.currentTimeMillis());
+		int result = rand.nextInt();
+		while (!range.contains(KeyGroupRangeAssignment.assignToKeyGroup(result, maxParallelism))) {
+			result = rand.nextInt();
+		}
+		return result;
+	}
+
 	private static HeapInternalTimerService<Integer, String> createTimerService(
 			Triggerable<Integer, String> triggerable,
 			KeyContext keyContext,
-			ProcessingTimeService processingTimeService) {
+			ProcessingTimeService processingTimeService,
+			KeyGroupsList keyGroupList,
+			int maxParallelism) {
 		HeapInternalTimerService<Integer, String> service =
 			new HeapInternalTimerService<>(
-				testKeyGroupList.getNumberOfKeyGroups(),
-				testKeyGroupList,
-				keyContext,
-				processingTimeService);
+					maxParallelism,
+					keyGroupList,
+					keyContext,
+					processingTimeService);
 
 		service.startTimerService(IntSerializer.INSTANCE, StringSerializer.INSTANCE, triggerable);
 		return service;
 	}
 
 	private static HeapInternalTimerService<Integer, String> restoreTimerService(
-			InputStream stateStream,
+			Map<Integer, byte[]> state,
 			Triggerable<Integer, String> triggerable,
 			KeyContext keyContext,
-			ProcessingTimeService processingTimeService) throws Exception {
+			ProcessingTimeService processingTimeService,
+			KeyGroupsList keyGroupsList,
+			int maxParallelism) throws Exception {
 
 		// create an empty service
 		HeapInternalTimerService<Integer, String> service =
 			new HeapInternalTimerService<>(
-				testKeyGroupList.getNumberOfKeyGroups(),
-				testKeyGroupList,
-				keyContext,
-				processingTimeService);
+					maxParallelism,
+					keyGroupsList,
+					keyContext,
+					processingTimeService);
 
 		// restore the timers
-		for (int keyGroupIdx = startKeyGroupIdx; keyGroupIdx < endKeyGroupIdx; keyGroupIdx++) {
-			service.restoreTimersForKeyGroup(
-				new DataInputViewStreamWrapper(stateStream),
-				keyGroupIdx,
-				HeapInternalTimerServiceTest.class.getClassLoader());
+		for (Integer keyGroupIndex : keyGroupsList) {
+			if (state.containsKey(keyGroupIndex)) {
+				service.restoreTimersForKeyGroup(
+						new DataInputViewStreamWrapper(new ByteArrayInputStream(state.get(keyGroupIndex))),
+						keyGroupIndex,
+						HeapInternalTimerServiceTest.class.getClassLoader());
+			}
 		}
 
 		// initialize the service
 		service.startTimerService(IntSerializer.INSTANCE, StringSerializer.INSTANCE, triggerable);
 		return service;
 	}
+
+	// ------------------------------------------------------------------------
+	//  Parametrization for testing with different key-group ranges
+	// ------------------------------------------------------------------------
+
+	@Parameterized.Parameters(name = "start = {0}, end = {1}, max = {2}")
+	@SuppressWarnings("unchecked,rawtypes")
+	public static Collection<Object[]> keyRanges(){
+		return Arrays.asList(new Object[][] {
+						{0, Short.MAX_VALUE - 1, Short.MAX_VALUE},
+						{0, 10, Short.MAX_VALUE},
+						{0, 10, 10},
+						{10, Short.MAX_VALUE - 1, Short.MAX_VALUE},
+						{2, 5, 100},
+						{2, 5, 6}
+		});
+	}
 }