You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/10/26 21:36:55 UTC

[9/9] flink git commit: [FLINK-4907] Add Test for Timers/State Provided by AbstractStreamOperator

[FLINK-4907] Add Test for Timers/State Provided by AbstractStreamOperator


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fa664e5b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fa664e5b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fa664e5b

Branch: refs/heads/master
Commit: fa664e5b9527ec82dae1f18746f7b2f0bbd7a3ba
Parents: 94c65fb
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue Oct 25 12:25:30 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Oct 26 23:26:29 2016 +0200

----------------------------------------------------------------------
 .../api/operators/AbstractStreamOperator.java   |   2 +-
 .../operators/AbstractStreamOperatorTest.java   | 400 +++++++++++++++++++
 .../util/AbstractStreamOperatorTestHarness.java |   3 +-
 .../util/OneInputStreamOperatorTestHarness.java |   9 +
 4 files changed, 411 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fa664e5b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index b3da6b2..5f0dd85 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -263,7 +263,7 @@ public abstract class AbstractStreamOperator<OUT>
 				KeyGroupRange subTaskKeyGroupRange = KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(
 						container.getEnvironment().getTaskInfo().getNumberOfKeyGroups(),
 						container.getEnvironment().getTaskInfo().getNumberOfParallelSubtasks(),
-						container.getIndexInSubtaskGroup());
+						container.getEnvironment().getTaskInfo().getIndexOfThisSubtask());
 
 				this.keyedStateBackend = container.createKeyedStateBackend(
 						keySerializer,

http://git-wip-us.apache.org/repos/asf/flink/blob/fa664e5b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
new file mode 100644
index 0000000..21f426b
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+
+
+/**
+ * Tests for the facilities provided by {@link AbstractStreamOperator}. This mostly
+ * tests timers and state and whether they are correctly checkpointed/restored
+ * with key-group reshuffling.
+ */
+public class AbstractStreamOperatorTest {
+
+	@Test
+	public void testStateDoesNotInterfere() throws Exception {
+		TestOperator testOperator = new TestOperator();
+
+		KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness =
+				new KeyedOneInputStreamOperatorTestHarness<>(testOperator, new TestKeySelector(), BasicTypeInfo.INT_TYPE_INFO);
+
+		testHarness.open();
+
+		testHarness.processElement(new Tuple2<>(0, "SET_STATE:HELLO"), 0);
+		testHarness.processElement(new Tuple2<>(1, "SET_STATE:CIAO"), 0);
+
+		testHarness.processElement(new Tuple2<>(1, "EMIT_STATE"), 0);
+		testHarness.processElement(new Tuple2<>(0, "EMIT_STATE"), 0);
+
+		assertThat(
+				extractResult(testHarness),
+				contains("ON_ELEMENT:1:CIAO", "ON_ELEMENT:0:HELLO"));
+	}
+
+	/**
+	 * Verify that firing event-time timers see the state of the key that was active
+	 * when the timer was set.
+	 */
+	@Test
+	public void testEventTimeTimersDontInterfere() throws Exception {
+		TestOperator testOperator = new TestOperator();
+
+		KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness =
+				new KeyedOneInputStreamOperatorTestHarness<>(testOperator, new TestKeySelector(), BasicTypeInfo.INT_TYPE_INFO);
+
+		testHarness.open();
+
+		testHarness.processWatermark(0L);
+
+		testHarness.processElement(new Tuple2<>(1, "SET_EVENT_TIME_TIMER:20"), 0);
+
+		testHarness.processElement(new Tuple2<>(0, "SET_STATE:HELLO"), 0);
+		testHarness.processElement(new Tuple2<>(1, "SET_STATE:CIAO"), 0);
+
+		testHarness.processElement(new Tuple2<>(0, "SET_EVENT_TIME_TIMER:10"), 0);
+
+		testHarness.processWatermark(10L);
+
+		assertThat(
+				extractResult(testHarness),
+				contains("ON_EVENT_TIME:HELLO"));
+
+		testHarness.processWatermark(20L);
+
+		assertThat(
+				extractResult(testHarness),
+				contains("ON_EVENT_TIME:CIAO"));
+	}
+
+	/**
+	 * Verify that firing processing-time timers see the state of the key that was active
+	 * when the timer was set.
+	 */
+	@Test
+	public void testProcessingTimeTimersDontInterfere() throws Exception {
+		TestOperator testOperator = new TestOperator();
+
+		KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness =
+				new KeyedOneInputStreamOperatorTestHarness<>(testOperator, new TestKeySelector(), BasicTypeInfo.INT_TYPE_INFO);
+
+		testHarness.open();
+
+		testHarness.setProcessingTime(0L);
+
+		testHarness.processElement(new Tuple2<>(1, "SET_PROC_TIME_TIMER:20"), 0);
+
+		testHarness.processElement(new Tuple2<>(0, "SET_STATE:HELLO"), 0);
+		testHarness.processElement(new Tuple2<>(1, "SET_STATE:CIAO"), 0);
+
+		testHarness.processElement(new Tuple2<>(0, "SET_PROC_TIME_TIMER:10"), 0);
+
+		testHarness.setProcessingTime(10L);
+
+		assertThat(
+				extractResult(testHarness),
+				contains("ON_PROC_TIME:HELLO"));
+
+		testHarness.setProcessingTime(20L);
+
+		assertThat(
+				extractResult(testHarness),
+				contains("ON_PROC_TIME:CIAO"));
+	}
+
+	/**
+	 * Verify that timers for the different time domains don't clash.
+	 */
+	@Test
+	public void testProcessingTimeAndEventTimeDontInterfere() throws Exception {
+		TestOperator testOperator = new TestOperator();
+
+		KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness =
+				new KeyedOneInputStreamOperatorTestHarness<>(testOperator, new TestKeySelector(), BasicTypeInfo.INT_TYPE_INFO);
+
+		testHarness.open();
+
+		testHarness.setProcessingTime(0L);
+		testHarness.processWatermark(0L);
+
+		testHarness.processElement(new Tuple2<>(0, "SET_PROC_TIME_TIMER:10"), 0);
+		testHarness.processElement(new Tuple2<>(0, "SET_EVENT_TIME_TIMER:20"), 0);
+
+		testHarness.processElement(new Tuple2<>(0, "SET_STATE:HELLO"), 0);
+
+		testHarness.processWatermark(20L);
+
+		assertThat(
+				extractResult(testHarness),
+				contains("ON_EVENT_TIME:HELLO"));
+
+		testHarness.setProcessingTime(10L);
+
+		assertThat(
+				extractResult(testHarness),
+				contains("ON_PROC_TIME:HELLO"));
+	}
+
+	/**
+	 * Verify that state and timers are checkpointed per key group and that they are correctly
+	 * assigned to operator subtasks when restoring.
+	 */
+	@Test
+	public void testStateAndTimerStateShuffling() throws Exception {
+		final int MAX_PARALLELISM = 10;
+
+		// first get two keys that will fall into different key-group ranges that go
+		// to different operator subtasks when we restore
+
+		// get two sub key-ranges so that we can restore two ranges separately
+		KeyGroupRange subKeyGroupRange1 = new KeyGroupRange(0, (MAX_PARALLELISM / 2) - 1);
+		KeyGroupRange subKeyGroupRange2 = new KeyGroupRange(subKeyGroupRange1.getEndKeyGroup() + 1, MAX_PARALLELISM - 1);
+
+		// get two different keys, one per sub range
+		int key1 = getKeyInKeyGroupRange(subKeyGroupRange1, MAX_PARALLELISM);
+		int key2 = getKeyInKeyGroupRange(subKeyGroupRange2, MAX_PARALLELISM);
+
+		TestOperator testOperator = new TestOperator();
+
+		KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness =
+				new KeyedOneInputStreamOperatorTestHarness<>(
+						testOperator,
+						new TestKeySelector(),
+						BasicTypeInfo.INT_TYPE_INFO,
+						MAX_PARALLELISM,
+						1, /* num subtasks */
+						0 /* subtask index */);
+
+		testHarness.open();
+
+		testHarness.processWatermark(0L);
+		testHarness.setProcessingTime(0L);
+
+		testHarness.processElement(new Tuple2<>(key1, "SET_EVENT_TIME_TIMER:10"), 0);
+		testHarness.processElement(new Tuple2<>(key2, "SET_EVENT_TIME_TIMER:20"), 0);
+
+		testHarness.processElement(new Tuple2<>(key1, "SET_PROC_TIME_TIMER:10"), 0);
+		testHarness.processElement(new Tuple2<>(key2, "SET_PROC_TIME_TIMER:20"), 0);
+
+		testHarness.processElement(new Tuple2<>(key1, "SET_STATE:HELLO"), 0);
+		testHarness.processElement(new Tuple2<>(key2, "SET_STATE:CIAO"), 0);
+
+		assertTrue(extractResult(testHarness).isEmpty());
+
+		OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
+
+		// now, restore in two operators, first operator 1
+
+		TestOperator testOperator1 = new TestOperator();
+
+		KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness1 =
+				new KeyedOneInputStreamOperatorTestHarness<>(
+						testOperator1,
+						new TestKeySelector(),
+						BasicTypeInfo.INT_TYPE_INFO,
+						MAX_PARALLELISM,
+						2, /* num subtasks */
+						0 /* subtask index */);
+
+		testHarness1.setup();
+		testHarness1.initializeState(snapshot);
+		testHarness1.open();
+
+		testHarness1.processWatermark(10L);
+
+		assertThat(extractResult(testHarness1), contains("ON_EVENT_TIME:HELLO"));
+
+		assertTrue(extractResult(testHarness1).isEmpty());
+
+		// this should not trigger anything, the trigger for WM=20 should sit in the
+		// other operator subtask
+		testHarness1.processWatermark(20L);
+
+		assertTrue(extractResult(testHarness1).isEmpty());
+
+
+		testHarness1.setProcessingTime(10L);
+
+		assertThat(extractResult(testHarness1), contains("ON_PROC_TIME:HELLO"));
+
+		assertTrue(extractResult(testHarness1).isEmpty());
+
+		// this should not trigger anything, the trigger for TIME=20 should sit in the
+		// other operator subtask
+		testHarness1.setProcessingTime(20L);
+
+		assertTrue(extractResult(testHarness1).isEmpty());
+
+		// now, for the second operator
+		TestOperator testOperator2 = new TestOperator();
+
+		KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness2 =
+				new KeyedOneInputStreamOperatorTestHarness<>(
+						testOperator2,
+						new TestKeySelector(),
+						BasicTypeInfo.INT_TYPE_INFO,
+						MAX_PARALLELISM,
+						2, /* num subtasks */
+						1 /* subtask index */);
+
+		testHarness2.setup();
+		testHarness2.initializeState(snapshot);
+		testHarness2.open();
+
+		testHarness2.processWatermark(10L);
+
+		// nothing should happen because this timer is in the other subtask
+		assertTrue(extractResult(testHarness2).isEmpty());
+
+		testHarness2.processWatermark(20L);
+
+		assertThat(extractResult(testHarness2), contains("ON_EVENT_TIME:CIAO"));
+
+		testHarness2.setProcessingTime(10L);
+
+		// nothing should happen because this timer is in the other subtask
+		assertTrue(extractResult(testHarness2).isEmpty());
+
+		testHarness2.setProcessingTime(20L);
+
+		assertThat(extractResult(testHarness2), contains("ON_PROC_TIME:CIAO"));
+
+		assertTrue(extractResult(testHarness2).isEmpty());
+	}
+
+	/**
+	 * Extracts the result values form the test harness and clear the output queue.
+	 */
+	@SuppressWarnings({"unchecked", "rawtypes"})
+	private <T> List<T> extractResult(OneInputStreamOperatorTestHarness<?, T> testHarness) {
+		List<StreamRecord<? extends T>> streamRecords = testHarness.extractOutputStreamRecords();
+		List<T> result = new ArrayList<>();
+		for (Object in : streamRecords) {
+			if (in instanceof StreamRecord) {
+				result.add((T) ((StreamRecord) in).getValue());
+			}
+		}
+		testHarness.getOutput().clear();
+		return result;
+	}
+
+
+	private static class TestKeySelector implements KeySelector<Tuple2<Integer, String>, Integer> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Integer getKey(Tuple2<Integer, String> value) throws Exception {
+			return value.f0;
+		}
+	}
+
+	/**
+	 * Testing operator that can respond to commands by either setting/deleting state, emitting
+	 * state or setting timers.
+	 */
+	private static class TestOperator
+			extends AbstractStreamOperator<String>
+			implements OneInputStreamOperator<Tuple2<Integer, String>, String>, Triggerable<Integer, VoidNamespace> {
+
+		private static final long serialVersionUID = 1L;
+
+		private transient InternalTimerService<VoidNamespace> timerService;
+
+		private final ValueStateDescriptor<String> stateDescriptor =
+				new ValueStateDescriptor<>("state", StringSerializer.INSTANCE, null);
+
+		@Override
+		public void open() throws Exception {
+			super.open();
+
+			this.timerService = getInternalTimerService(
+					"test-timers",
+					IntSerializer.INSTANCE,
+					VoidNamespaceSerializer.INSTANCE,
+					this);
+		}
+
+		@Override
+		public void processElement(StreamRecord<Tuple2<Integer, String>> element) throws Exception {
+			String[] command = element.getValue().f1.split(":");
+			switch (command[0]) {
+				case "SET_STATE":
+					getPartitionedState(stateDescriptor).update(command[1]);
+					break;
+				case "DELETE_STATE":
+					getPartitionedState(stateDescriptor).clear();
+					break;
+				case "SET_EVENT_TIME_TIMER":
+					timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, Long.parseLong(command[1]));
+					break;
+				case "SET_PROC_TIME_TIMER":
+					timerService.registerProcessingTimeTimer(VoidNamespace.INSTANCE, Long.parseLong(command[1]));
+					break;
+				case "EMIT_STATE":
+					String stateValue = getPartitionedState(stateDescriptor).value();
+					output.collect(new StreamRecord<>("ON_ELEMENT:" + element.getValue().f0 + ":" + stateValue));
+					break;
+				default:
+					throw new IllegalArgumentException();
+			}
+		}
+
+		@Override
+		public void onEventTime(InternalTimer<Integer, VoidNamespace> timer) throws Exception {
+			String stateValue = getPartitionedState(stateDescriptor).value();
+			output.collect(new StreamRecord<>("ON_EVENT_TIME:" + stateValue));
+		}
+
+		@Override
+		public void onProcessingTime(InternalTimer<Integer, VoidNamespace> timer) throws Exception {
+			String stateValue = getPartitionedState(stateDescriptor).value();
+			output.collect(new StreamRecord<>("ON_PROC_TIME:" + stateValue));
+		}
+	}
+
+	private static int getKeyInKeyGroupRange(KeyGroupRange range, int maxParallelism) {
+		Random rand = new Random(System.currentTimeMillis());
+		int result = rand.nextInt();
+		while (!range.contains(KeyGroupRangeAssignment.assignToKeyGroup(result, maxParallelism))) {
+			result = rand.nextInt();
+		}
+		return result;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fa664e5b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index af1a7ba..03f3bce 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -214,8 +214,7 @@ public class AbstractStreamOperatorTestHarness<OUT> {
 	}
 
 	/**
-	 * Get all the output from the task and clear the output buffer.
-	 * This contains only StreamRecords.
+	 * Get only the {@link StreamRecord StreamRecords} emitted by the operator.
 	 */
 	@SuppressWarnings("unchecked")
 	public List<StreamRecord<? extends OUT>> extractOutputStreamRecords() {

http://git-wip-us.apache.org/repos/asf/flink/blob/fa664e5b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index 105922b..7468d9a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -49,6 +49,11 @@ public class OneInputStreamOperatorTestHarness<IN, OUT>
 		this.oneInputOperator = operator;
 	}
 
+	public void processElement(IN value, long timestamp) throws Exception {
+		processElement(new StreamRecord<>(value, timestamp));
+	}
+
+
 	public void processElement(StreamRecord<IN> element) throws Exception {
 		operator.setKeyContextElement1(element);
 		oneInputOperator.processElement(element);
@@ -61,6 +66,10 @@ public class OneInputStreamOperatorTestHarness<IN, OUT>
 		}
 	}
 
+	public void processWatermark(long watermark) throws Exception {
+		oneInputOperator.processWatermark(new Watermark(watermark));
+	}
+
 	public void processWatermark(Watermark mark) throws Exception {
 		oneInputOperator.processWatermark(mark);
 	}