You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/11/04 09:24:55 UTC
flink git commit: [FLINK-4960] Add
AbstractStreamOperatorTestHarness.repackageState()
Repository: flink
Updated Branches:
refs/heads/master f4c336b16 -> c38589766
[FLINK-4960] Add AbstractStreamOperatorTestHarness.repackageState()
The new method allows testing operator scale-in by combining several
OperatorStateHandles (that result from TestHarness.snapshot()) into one
to allow restoring an operator with a lower parallelism.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c3858976
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c3858976
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c3858976
Branch: refs/heads/master
Commit: c385897661f36122e34ffa6a11996b983e2dc14a
Parents: f4c336b
Author: kl0u <kk...@gmail.com>
Authored: Fri Oct 28 15:37:42 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Nov 4 10:24:40 2016 +0100
----------------------------------------------------------------------
.../operators/AbstractStreamOperatorTest.java | 106 ++++++++++++++++++-
.../util/AbstractStreamOperatorTestHarness.java | 69 ++++++++++++
2 files changed, 173 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c3858976/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
index 21f426b..fd05353 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.junit.Test;
@@ -177,7 +178,7 @@ public class AbstractStreamOperatorTest {
* assigned to operator subtasks when restoring.
*/
@Test
- public void testStateAndTimerStateShuffling() throws Exception {
+ public void testStateAndTimerStateShufflingScalingUp() throws Exception {
final int MAX_PARALLELISM = 10;
// first get two keys that will fall into different key-group ranges that go
@@ -249,7 +250,6 @@ public class AbstractStreamOperatorTest {
assertTrue(extractResult(testHarness1).isEmpty());
-
testHarness1.setProcessingTime(10L);
assertThat(extractResult(testHarness1), contains("ON_PROC_TIME:HELLO"));
@@ -299,6 +299,108 @@ public class AbstractStreamOperatorTest {
assertTrue(extractResult(testHarness2).isEmpty());
}
+ @Test
+ public void testStateAndTimerStateShufflingScalingDown() throws Exception {
+ final int MAX_PARALLELISM = 10;
+
+ // first get two keys that will fall into different key-group ranges that go
+ // to different operator subtasks when we restore
+
+ // get two sub key-ranges so that we can restore two ranges separately
+ KeyGroupRange subKeyGroupRange1 = new KeyGroupRange(0, (MAX_PARALLELISM / 2) - 1);
+ KeyGroupRange subKeyGroupRange2 = new KeyGroupRange(subKeyGroupRange1.getEndKeyGroup() + 1, MAX_PARALLELISM - 1);
+
+ // get two different keys, one per sub range
+ int key1 = getKeyInKeyGroupRange(subKeyGroupRange1, MAX_PARALLELISM);
+ int key2 = getKeyInKeyGroupRange(subKeyGroupRange2, MAX_PARALLELISM);
+
+ TestOperator testOperator1 = new TestOperator();
+
+ KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness1 =
+ new KeyedOneInputStreamOperatorTestHarness<>(
+ testOperator1,
+ new TestKeySelector(),
+ BasicTypeInfo.INT_TYPE_INFO,
+ MAX_PARALLELISM,
+ 2, /* num subtasks */
+ 0 /* subtask index */);
+
+ testHarness1.setup();
+ testHarness1.open();
+
+ testHarness1.processWatermark(0L);
+ testHarness1.setProcessingTime(0L);
+
+ TestOperator testOperator2 = new TestOperator();
+
+ KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness2 =
+ new KeyedOneInputStreamOperatorTestHarness<>(
+ testOperator2,
+ new TestKeySelector(),
+ BasicTypeInfo.INT_TYPE_INFO,
+ MAX_PARALLELISM,
+ 2, /* num subtasks */
+ 1 /* subtask index */);
+
+
+ testHarness2.setup();
+ testHarness2.open();
+
+ testHarness2.processWatermark(0L);
+ testHarness2.setProcessingTime(0L);
+
+ // register some state with both instances and scale down to parallelism 1
+
+ testHarness1.processElement(new Tuple2<>(key1, "SET_EVENT_TIME_TIMER:30"), 0);
+ testHarness1.processElement(new Tuple2<>(key1, "SET_PROC_TIME_TIMER:30"), 0);
+ testHarness1.processElement(new Tuple2<>(key1, "SET_STATE:HELLO"), 0);
+
+ testHarness2.processElement(new Tuple2<>(key2, "SET_EVENT_TIME_TIMER:40"), 0);
+ testHarness2.processElement(new Tuple2<>(key2, "SET_PROC_TIME_TIMER:40"), 0);
+ testHarness2.processElement(new Tuple2<>(key2, "SET_STATE:CIAO"), 0);
+
+ // take a snapshot from each one of the "parallel" instances of the operator
+ // and combine them into one so that we can scale down
+
+ OperatorStateHandles repackagedState =
+ AbstractStreamOperatorTestHarness.repackageState(
+ testHarness1.snapshot(0, 0),
+ testHarness2.snapshot(0, 0)
+ );
+
+ // now, for the third operator that scales down from parallelism of 2 to 1
+ TestOperator testOperator3 = new TestOperator();
+
+ KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness3 =
+ new KeyedOneInputStreamOperatorTestHarness<>(
+ testOperator3,
+ new TestKeySelector(),
+ BasicTypeInfo.INT_TYPE_INFO,
+ MAX_PARALLELISM,
+ 1, /* num subtasks */
+ 0 /* subtask index */);
+
+ testHarness3.setup();
+ testHarness3.initializeState(repackagedState);
+ testHarness3.open();
+
+ testHarness3.processWatermark(30L);
+ assertThat(extractResult(testHarness3), contains("ON_EVENT_TIME:HELLO"));
+ assertTrue(extractResult(testHarness3).isEmpty());
+
+ testHarness3.processWatermark(40L);
+ assertThat(extractResult(testHarness3), contains("ON_EVENT_TIME:CIAO"));
+ assertTrue(extractResult(testHarness3).isEmpty());
+
+ testHarness3.setProcessingTime(30L);
+ assertThat(extractResult(testHarness3), contains("ON_PROC_TIME:HELLO"));
+ assertTrue(extractResult(testHarness3).isEmpty());
+
+ testHarness3.setProcessingTime(40L);
+ assertThat(extractResult(testHarness3), contains("ON_PROC_TIME:CIAO"));
+ assertTrue(extractResult(testHarness3).isEmpty());
+ }
+
/**
* Extracts the result values form the test harness and clear the output queue.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/c3858976/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 03f3bce..c923b17 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -41,6 +41,7 @@ import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorTest;
import org.apache.flink.streaming.api.operators.OperatorSnapshotResult;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
@@ -309,6 +310,74 @@ public class AbstractStreamOperatorTestHarness<OUT> {
initializeCalled = true;
}
+ /**
+ * Takes the different {@link OperatorStateHandles} created by calling {@link #snapshot(long, long)}
+ * on different instances of {@link AbstractStreamOperatorTestHarness} (each one representing one subtask)
+ * and repacks them into a single {@link OperatorStateHandles} so that the parallelism of the test
+ * can change arbitrarily (i.e. be able to scale both up and down).
+ *
+ * <p>
+ * After repacking the partial states, use {@link #initializeState(OperatorStateHandles)} to initialize
+ * a new instance with the resulting state. Bear in mind that for parallelism greater than one, you
+ * have to use the constructor {@link #AbstractStreamOperatorTestHarness(StreamOperator, int, int, int)}.
+ *
+ * <p>
+ * <b>NOTE: </b> each of the {@code handles} in the argument list is assumed to be from a single task of a single
+ * operator (i.e. chain length of one).
+ *
+ * <p>
+ * For an example of how to use it, have a look at
+ * {@link AbstractStreamOperatorTest#testStateAndTimerStateShufflingScalingDown()}.
+ *
+ * @param handles the different states to be merged.
+ * @return the resulting state, or {@code null} if no partial states are specified.
+ */
+ public static OperatorStateHandles repackageState(OperatorStateHandles... handles) throws Exception {
+
+ if (handles.length < 1) {
+ return null;
+ } else if (handles.length == 1) {
+ return handles[0];
+ }
+
+ List<OperatorStateHandle> mergedManagedOperatorState = new ArrayList<>(handles.length);
+ List<OperatorStateHandle> mergedRawOperatorState = new ArrayList<>(handles.length);
+
+ List<KeyGroupsStateHandle> mergedManagedKeyedState = new ArrayList<>(handles.length);
+ List<KeyGroupsStateHandle> mergedRawKeyedState = new ArrayList<>(handles.length);
+
+ for (OperatorStateHandles handle: handles) {
+
+ Collection<OperatorStateHandle> managedOperatorState = handle.getManagedOperatorState();
+ Collection<OperatorStateHandle> rawOperatorState = handle.getRawOperatorState();
+ Collection<KeyGroupsStateHandle> managedKeyedState = handle.getManagedKeyedState();
+ Collection<KeyGroupsStateHandle> rawKeyedState = handle.getRawKeyedState();
+
+ if (managedOperatorState != null) {
+ mergedManagedOperatorState.addAll(managedOperatorState);
+ }
+
+ if (rawOperatorState != null) {
+ mergedRawOperatorState.addAll(rawOperatorState);
+ }
+
+ if (managedKeyedState != null) {
+ mergedManagedKeyedState.addAll(managedKeyedState);
+ }
+
+ if (rawKeyedState != null) {
+ mergedRawKeyedState.addAll(rawKeyedState);
+ }
+ }
+
+ return new OperatorStateHandles(
+ 0,
+ null,
+ mergedManagedKeyedState,
+ mergedRawKeyedState,
+ mergedManagedOperatorState,
+ mergedRawOperatorState);
+ }
/**
* Calls {@link StreamOperator#open()}. This also