You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/10/26 21:36:55 UTC
[9/9] flink git commit: [FLINK-4907] Add Test for Timers/State
Provided by AbstractStreamOperator
[FLINK-4907] Add Test for Timers/State Provided by AbstractStreamOperator
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fa664e5b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fa664e5b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fa664e5b
Branch: refs/heads/master
Commit: fa664e5b9527ec82dae1f18746f7b2f0bbd7a3ba
Parents: 94c65fb
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue Oct 25 12:25:30 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Oct 26 23:26:29 2016 +0200
----------------------------------------------------------------------
.../api/operators/AbstractStreamOperator.java | 2 +-
.../operators/AbstractStreamOperatorTest.java | 400 +++++++++++++++++++
.../util/AbstractStreamOperatorTestHarness.java | 3 +-
.../util/OneInputStreamOperatorTestHarness.java | 9 +
4 files changed, 411 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/fa664e5b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index b3da6b2..5f0dd85 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -263,7 +263,7 @@ public abstract class AbstractStreamOperator<OUT>
KeyGroupRange subTaskKeyGroupRange = KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(
container.getEnvironment().getTaskInfo().getNumberOfKeyGroups(),
container.getEnvironment().getTaskInfo().getNumberOfParallelSubtasks(),
- container.getIndexInSubtaskGroup());
+ container.getEnvironment().getTaskInfo().getIndexOfThisSubtask());
this.keyedStateBackend = container.createKeyedStateBackend(
keySerializer,
http://git-wip-us.apache.org/repos/asf/flink/blob/fa664e5b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
new file mode 100644
index 0000000..21f426b
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+
+
+/**
+ * Tests for the facilities provided by {@link AbstractStreamOperator}. This mostly
+ * tests timers and state and whether they are correctly checkpointed/restored
+ * with key-group reshuffling.
+ */
+public class AbstractStreamOperatorTest {
+
+ @Test
+ public void testStateDoesNotInterfere() throws Exception {
+ TestOperator testOperator = new TestOperator();
+
+ KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(testOperator, new TestKeySelector(), BasicTypeInfo.INT_TYPE_INFO);
+
+ testHarness.open();
+
+ testHarness.processElement(new Tuple2<>(0, "SET_STATE:HELLO"), 0);
+ testHarness.processElement(new Tuple2<>(1, "SET_STATE:CIAO"), 0);
+
+ testHarness.processElement(new Tuple2<>(1, "EMIT_STATE"), 0);
+ testHarness.processElement(new Tuple2<>(0, "EMIT_STATE"), 0);
+
+ assertThat(
+ extractResult(testHarness),
+ contains("ON_ELEMENT:1:CIAO", "ON_ELEMENT:0:HELLO"));
+ }
+
+ /**
+ * Verify that firing event-time timers see the state of the key that was active
+ * when the timer was set.
+ */
+ @Test
+ public void testEventTimeTimersDontInterfere() throws Exception {
+ TestOperator testOperator = new TestOperator();
+
+ KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(testOperator, new TestKeySelector(), BasicTypeInfo.INT_TYPE_INFO);
+
+ testHarness.open();
+
+ testHarness.processWatermark(0L);
+
+ testHarness.processElement(new Tuple2<>(1, "SET_EVENT_TIME_TIMER:20"), 0);
+
+ testHarness.processElement(new Tuple2<>(0, "SET_STATE:HELLO"), 0);
+ testHarness.processElement(new Tuple2<>(1, "SET_STATE:CIAO"), 0);
+
+ testHarness.processElement(new Tuple2<>(0, "SET_EVENT_TIME_TIMER:10"), 0);
+
+ testHarness.processWatermark(10L);
+
+ assertThat(
+ extractResult(testHarness),
+ contains("ON_EVENT_TIME:HELLO"));
+
+ testHarness.processWatermark(20L);
+
+ assertThat(
+ extractResult(testHarness),
+ contains("ON_EVENT_TIME:CIAO"));
+ }
+
+ /**
+ * Verify that firing processing-time timers see the state of the key that was active
+ * when the timer was set.
+ */
+ @Test
+ public void testProcessingTimeTimersDontInterfere() throws Exception {
+ TestOperator testOperator = new TestOperator();
+
+ KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(testOperator, new TestKeySelector(), BasicTypeInfo.INT_TYPE_INFO);
+
+ testHarness.open();
+
+ testHarness.setProcessingTime(0L);
+
+ testHarness.processElement(new Tuple2<>(1, "SET_PROC_TIME_TIMER:20"), 0);
+
+ testHarness.processElement(new Tuple2<>(0, "SET_STATE:HELLO"), 0);
+ testHarness.processElement(new Tuple2<>(1, "SET_STATE:CIAO"), 0);
+
+ testHarness.processElement(new Tuple2<>(0, "SET_PROC_TIME_TIMER:10"), 0);
+
+ testHarness.setProcessingTime(10L);
+
+ assertThat(
+ extractResult(testHarness),
+ contains("ON_PROC_TIME:HELLO"));
+
+ testHarness.setProcessingTime(20L);
+
+ assertThat(
+ extractResult(testHarness),
+ contains("ON_PROC_TIME:CIAO"));
+ }
+
+ /**
+ * Verify that timers for the different time domains don't clash.
+ */
+ @Test
+ public void testProcessingTimeAndEventTimeDontInterfere() throws Exception {
+ TestOperator testOperator = new TestOperator();
+
+ KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(testOperator, new TestKeySelector(), BasicTypeInfo.INT_TYPE_INFO);
+
+ testHarness.open();
+
+ testHarness.setProcessingTime(0L);
+ testHarness.processWatermark(0L);
+
+ testHarness.processElement(new Tuple2<>(0, "SET_PROC_TIME_TIMER:10"), 0);
+ testHarness.processElement(new Tuple2<>(0, "SET_EVENT_TIME_TIMER:20"), 0);
+
+ testHarness.processElement(new Tuple2<>(0, "SET_STATE:HELLO"), 0);
+
+ testHarness.processWatermark(20L);
+
+ assertThat(
+ extractResult(testHarness),
+ contains("ON_EVENT_TIME:HELLO"));
+
+ testHarness.setProcessingTime(10L);
+
+ assertThat(
+ extractResult(testHarness),
+ contains("ON_PROC_TIME:HELLO"));
+ }
+
+ /**
+ * Verify that state and timers are checkpointed per key group and that they are correctly
+ * assigned to operator subtasks when restoring.
+ */
+ @Test
+ public void testStateAndTimerStateShuffling() throws Exception {
+ final int MAX_PARALLELISM = 10;
+
+ // first get two keys that will fall into different key-group ranges that go
+ // to different operator subtasks when we restore
+
+ // get two sub key-ranges so that we can restore two ranges separately
+ KeyGroupRange subKeyGroupRange1 = new KeyGroupRange(0, (MAX_PARALLELISM / 2) - 1);
+ KeyGroupRange subKeyGroupRange2 = new KeyGroupRange(subKeyGroupRange1.getEndKeyGroup() + 1, MAX_PARALLELISM - 1);
+
+ // get two different keys, one per sub range
+ int key1 = getKeyInKeyGroupRange(subKeyGroupRange1, MAX_PARALLELISM);
+ int key2 = getKeyInKeyGroupRange(subKeyGroupRange2, MAX_PARALLELISM);
+
+ TestOperator testOperator = new TestOperator();
+
+ KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(
+ testOperator,
+ new TestKeySelector(),
+ BasicTypeInfo.INT_TYPE_INFO,
+ MAX_PARALLELISM,
+ 1, /* num subtasks */
+ 0 /* subtask index */);
+
+ testHarness.open();
+
+ testHarness.processWatermark(0L);
+ testHarness.setProcessingTime(0L);
+
+ testHarness.processElement(new Tuple2<>(key1, "SET_EVENT_TIME_TIMER:10"), 0);
+ testHarness.processElement(new Tuple2<>(key2, "SET_EVENT_TIME_TIMER:20"), 0);
+
+ testHarness.processElement(new Tuple2<>(key1, "SET_PROC_TIME_TIMER:10"), 0);
+ testHarness.processElement(new Tuple2<>(key2, "SET_PROC_TIME_TIMER:20"), 0);
+
+ testHarness.processElement(new Tuple2<>(key1, "SET_STATE:HELLO"), 0);
+ testHarness.processElement(new Tuple2<>(key2, "SET_STATE:CIAO"), 0);
+
+ assertTrue(extractResult(testHarness).isEmpty());
+
+ OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
+
+ // now, restore in two operators, first operator 1
+
+ TestOperator testOperator1 = new TestOperator();
+
+ KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness1 =
+ new KeyedOneInputStreamOperatorTestHarness<>(
+ testOperator1,
+ new TestKeySelector(),
+ BasicTypeInfo.INT_TYPE_INFO,
+ MAX_PARALLELISM,
+ 2, /* num subtasks */
+ 0 /* subtask index */);
+
+ testHarness1.setup();
+ testHarness1.initializeState(snapshot);
+ testHarness1.open();
+
+ testHarness1.processWatermark(10L);
+
+ assertThat(extractResult(testHarness1), contains("ON_EVENT_TIME:HELLO"));
+
+ assertTrue(extractResult(testHarness1).isEmpty());
+
+ // this should not trigger anything, the trigger for WM=20 should sit in the
+ // other operator subtask
+ testHarness1.processWatermark(20L);
+
+ assertTrue(extractResult(testHarness1).isEmpty());
+
+
+ testHarness1.setProcessingTime(10L);
+
+ assertThat(extractResult(testHarness1), contains("ON_PROC_TIME:HELLO"));
+
+ assertTrue(extractResult(testHarness1).isEmpty());
+
+ // this should not trigger anything, the trigger for TIME=20 should sit in the
+ // other operator subtask
+ testHarness1.setProcessingTime(20L);
+
+ assertTrue(extractResult(testHarness1).isEmpty());
+
+ // now, for the second operator
+ TestOperator testOperator2 = new TestOperator();
+
+ KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness2 =
+ new KeyedOneInputStreamOperatorTestHarness<>(
+ testOperator2,
+ new TestKeySelector(),
+ BasicTypeInfo.INT_TYPE_INFO,
+ MAX_PARALLELISM,
+ 2, /* num subtasks */
+ 1 /* subtask index */);
+
+ testHarness2.setup();
+ testHarness2.initializeState(snapshot);
+ testHarness2.open();
+
+ testHarness2.processWatermark(10L);
+
+ // nothing should happen because this timer is in the other subtask
+ assertTrue(extractResult(testHarness2).isEmpty());
+
+ testHarness2.processWatermark(20L);
+
+ assertThat(extractResult(testHarness2), contains("ON_EVENT_TIME:CIAO"));
+
+ testHarness2.setProcessingTime(10L);
+
+ // nothing should happen because this timer is in the other subtask
+ assertTrue(extractResult(testHarness2).isEmpty());
+
+ testHarness2.setProcessingTime(20L);
+
+ assertThat(extractResult(testHarness2), contains("ON_PROC_TIME:CIAO"));
+
+ assertTrue(extractResult(testHarness2).isEmpty());
+ }
+
+ /**
+ * Extracts the result values form the test harness and clear the output queue.
+ */
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ private <T> List<T> extractResult(OneInputStreamOperatorTestHarness<?, T> testHarness) {
+ List<StreamRecord<? extends T>> streamRecords = testHarness.extractOutputStreamRecords();
+ List<T> result = new ArrayList<>();
+ for (Object in : streamRecords) {
+ if (in instanceof StreamRecord) {
+ result.add((T) ((StreamRecord) in).getValue());
+ }
+ }
+ testHarness.getOutput().clear();
+ return result;
+ }
+
+
+ private static class TestKeySelector implements KeySelector<Tuple2<Integer, String>, Integer> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Integer getKey(Tuple2<Integer, String> value) throws Exception {
+ return value.f0;
+ }
+ }
+
+ /**
+ * Testing operator that can respond to commands by either setting/deleting state, emitting
+ * state or setting timers.
+ */
+ private static class TestOperator
+ extends AbstractStreamOperator<String>
+ implements OneInputStreamOperator<Tuple2<Integer, String>, String>, Triggerable<Integer, VoidNamespace> {
+
+ private static final long serialVersionUID = 1L;
+
+ private transient InternalTimerService<VoidNamespace> timerService;
+
+ private final ValueStateDescriptor<String> stateDescriptor =
+ new ValueStateDescriptor<>("state", StringSerializer.INSTANCE, null);
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+
+ this.timerService = getInternalTimerService(
+ "test-timers",
+ IntSerializer.INSTANCE,
+ VoidNamespaceSerializer.INSTANCE,
+ this);
+ }
+
+ @Override
+ public void processElement(StreamRecord<Tuple2<Integer, String>> element) throws Exception {
+ String[] command = element.getValue().f1.split(":");
+ switch (command[0]) {
+ case "SET_STATE":
+ getPartitionedState(stateDescriptor).update(command[1]);
+ break;
+ case "DELETE_STATE":
+ getPartitionedState(stateDescriptor).clear();
+ break;
+ case "SET_EVENT_TIME_TIMER":
+ timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, Long.parseLong(command[1]));
+ break;
+ case "SET_PROC_TIME_TIMER":
+ timerService.registerProcessingTimeTimer(VoidNamespace.INSTANCE, Long.parseLong(command[1]));
+ break;
+ case "EMIT_STATE":
+ String stateValue = getPartitionedState(stateDescriptor).value();
+ output.collect(new StreamRecord<>("ON_ELEMENT:" + element.getValue().f0 + ":" + stateValue));
+ break;
+ default:
+ throw new IllegalArgumentException();
+ }
+ }
+
+ @Override
+ public void onEventTime(InternalTimer<Integer, VoidNamespace> timer) throws Exception {
+ String stateValue = getPartitionedState(stateDescriptor).value();
+ output.collect(new StreamRecord<>("ON_EVENT_TIME:" + stateValue));
+ }
+
+ @Override
+ public void onProcessingTime(InternalTimer<Integer, VoidNamespace> timer) throws Exception {
+ String stateValue = getPartitionedState(stateDescriptor).value();
+ output.collect(new StreamRecord<>("ON_PROC_TIME:" + stateValue));
+ }
+ }
+
+ private static int getKeyInKeyGroupRange(KeyGroupRange range, int maxParallelism) {
+ Random rand = new Random(System.currentTimeMillis());
+ int result = rand.nextInt();
+ while (!range.contains(KeyGroupRangeAssignment.assignToKeyGroup(result, maxParallelism))) {
+ result = rand.nextInt();
+ }
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fa664e5b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index af1a7ba..03f3bce 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -214,8 +214,7 @@ public class AbstractStreamOperatorTestHarness<OUT> {
}
/**
- * Get all the output from the task and clear the output buffer.
- * This contains only StreamRecords.
+ * Get only the {@link StreamRecord StreamRecords} emitted by the operator.
*/
@SuppressWarnings("unchecked")
public List<StreamRecord<? extends OUT>> extractOutputStreamRecords() {
http://git-wip-us.apache.org/repos/asf/flink/blob/fa664e5b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index 105922b..7468d9a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -49,6 +49,11 @@ public class OneInputStreamOperatorTestHarness<IN, OUT>
this.oneInputOperator = operator;
}
+ public void processElement(IN value, long timestamp) throws Exception {
+ processElement(new StreamRecord<>(value, timestamp));
+ }
+
+
public void processElement(StreamRecord<IN> element) throws Exception {
operator.setKeyContextElement1(element);
oneInputOperator.processElement(element);
@@ -61,6 +66,10 @@ public class OneInputStreamOperatorTestHarness<IN, OUT>
}
}
+ public void processWatermark(long watermark) throws Exception {
+ oneInputOperator.processWatermark(new Watermark(watermark));
+ }
+
public void processWatermark(Watermark mark) throws Exception {
oneInputOperator.processWatermark(mark);
}