You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/04/25 10:20:11 UTC
flink git commit: [FLINK-3740] Make Session Window State Checkpointed
Repository: flink
Updated Branches:
refs/heads/master e48a5f19e -> f2f5bd5be
[FLINK-3740] Make Session Window State Checkpointed
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f2f5bd5b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f2f5bd5b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f2f5bd5b
Branch: refs/heads/master
Commit: f2f5bd5bed2f737b8418be09359691e899462184
Parents: e48a5f1
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue Apr 12 16:27:18 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Apr 25 10:18:40 2016 +0200
----------------------------------------------------------------------
.../windowing/EvictingWindowOperator.java | 17 +--
.../operators/windowing/MergingWindowSet.java | 20 +++
.../operators/windowing/WindowOperator.java | 86 ++++++++++--
.../operators/windowing/WindowOperatorTest.java | 131 +++++++++++++------
.../util/OneInputStreamOperatorTestHarness.java | 37 +++++-
5 files changed, 229 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f2f5bd5b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index 1e4e453..84ee0b9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -82,15 +82,11 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
Collection<W> elementWindows = windowAssigner.assignWindows(element.getValue(),
element.getTimestamp());
- K key = (K) getStateBackend().getCurrentKey();
+ final K key = (K) getStateBackend().getCurrentKey();
if (windowAssigner instanceof MergingWindowAssigner) {
- MergingWindowSet<W> mergingWindows = mergingWindowsByKey.get(getStateBackend().getCurrentKey());
- if (mergingWindows == null) {
- mergingWindows = new MergingWindowSet<>((MergingWindowAssigner<? super IN, W>) windowAssigner);
- mergingWindowsByKey.put(key, mergingWindows);
- }
+ MergingWindowSet<W> mergingWindows = getMergingWindowSet();
for (W window : elementWindows) {
// If there is a merge, it can only result in a window that contains our new
@@ -107,6 +103,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
public void merge(W mergeResult,
Collection<W> mergedWindows, W stateWindowResult,
Collection<W> mergedStateWindows) throws Exception {
+ context.key = key;
context.window = mergeResult;
// store for later use
@@ -141,7 +138,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
TriggerResult combinedTriggerResult = TriggerResult.merge(triggerResult,
mergeTriggerResult.f0);
- processTriggerResult(combinedTriggerResult, key, actualWindow);
+ processTriggerResult(combinedTriggerResult, actualWindow);
}
} else {
@@ -157,14 +154,14 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
context.window = window;
TriggerResult triggerResult = context.onElement(element);
- processTriggerResult(triggerResult, key, window);
+ processTriggerResult(triggerResult, window);
}
}
}
@Override
@SuppressWarnings("unchecked,rawtypes")
- protected void processTriggerResult(TriggerResult triggerResult, K key, W window) throws Exception {
+ protected void processTriggerResult(TriggerResult triggerResult, W window) throws Exception {
if (!triggerResult.isFire() && !triggerResult.isPurge()) {
// do nothing
return;
@@ -175,7 +172,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
MergingWindowSet<W> mergingWindows = null;
if (windowAssigner instanceof MergingWindowAssigner) {
- mergingWindows = mergingWindowsByKey.get(key);
+ mergingWindows = getMergingWindowSet();
W stateWindow = mergingWindows.getStateWindow(window);
windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
http://git-wip-us.apache.org/repos/asf/flink/blob/f2f5bd5b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java
index 7ef1af4..49a2017 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java
@@ -17,6 +17,8 @@
*/
package org.apache.flink.streaming.runtime.operators.windowing;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.slf4j.Logger;
@@ -73,6 +75,24 @@ public class MergingWindowSet<W extends Window> {
}
/**
+ * Restores a {@link MergingWindowSet} from the given state.
+ */
+ public MergingWindowSet(MergingWindowAssigner<?, W> windowAssigner, ListState<Tuple2<W, W>> state) throws Exception {
+ this.windowAssigner = windowAssigner;
+ windows = new HashMap<>();
+
+ for (Tuple2<W, W> window: state.get()) {
+ windows.put(window.f0, window.f1);
+ }
+ }
+
+ public void persist(ListState<Tuple2<W, W>> state) throws Exception {
+ for (Map.Entry<W, W> window: windows.entrySet()) {
+ state.add(new Tuple2<>(window.getKey(), window.getValue()));
+ }
+ }
+
+ /**
* Returns the state window for the given in-flight {@code Window}. The state window is the
* {@code Window} in which we keep the actual state of a given in-flight window. Windows
* might expand but we keep to original state window for keeping the elements of the window
http://git-wip-us.apache.org/repos/asf/flink/blob/f2f5bd5b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index c106e70..919cee7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -22,6 +22,8 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.AppendingState;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MergingState;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
@@ -29,10 +31,13 @@ import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.VoidSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.StateHandle;
@@ -224,6 +229,25 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
@Override
public final void close() throws Exception {
super.close();
+ timestampedCollector = null;
+ watermarkTimers = null;
+ watermarkTimersQueue = null;
+ processingTimeTimers = null;
+ processingTimeTimersQueue = null;
+ context = null;
+ mergingWindowsByKey = null;
+ }
+
+ @Override
+ public void dispose() {
+ super.dispose();
+ timestampedCollector = null;
+ watermarkTimers = null;
+ watermarkTimersQueue = null;
+ processingTimeTimers = null;
+ processingTimeTimersQueue = null;
+ context = null;
+ mergingWindowsByKey = null;
}
@Override
@@ -231,15 +255,10 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
public void processElement(StreamRecord<IN> element) throws Exception {
Collection<W> elementWindows = windowAssigner.assignWindows(element.getValue(), element.getTimestamp());
- K key = (K) getStateBackend().getCurrentKey();
+ final K key = (K) getStateBackend().getCurrentKey();
if (windowAssigner instanceof MergingWindowAssigner) {
- MergingWindowSet<W> mergingWindows = mergingWindowsByKey.get(getStateBackend().getCurrentKey());
- if (mergingWindows == null) {
- mergingWindows = new MergingWindowSet<>((MergingWindowAssigner<? super IN, W>) windowAssigner);
- mergingWindowsByKey.put(key, mergingWindows);
- }
-
+ MergingWindowSet<W> mergingWindows = getMergingWindowSet();
for (W window: elementWindows) {
// If there is a merge, it can only result in a window that contains our new
@@ -255,6 +274,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
public void merge(W mergeResult,
Collection<W> mergedWindows, W stateWindowResult,
Collection<W> mergedStateWindows) throws Exception {
+ context.key = key;
context.window = mergeResult;
// store for later use
@@ -286,7 +306,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
TriggerResult combinedTriggerResult = TriggerResult.merge(triggerResult, mergeTriggerResult.f0);
- processTriggerResult(combinedTriggerResult, key, actualWindow);
+ processTriggerResult(combinedTriggerResult, actualWindow);
}
} else {
@@ -301,13 +321,40 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
context.window = window;
TriggerResult triggerResult = context.onElement(element);
- processTriggerResult(triggerResult, key, window);
+ processTriggerResult(triggerResult, window);
}
}
}
+ /**
+ * Retrieves the {@link MergingWindowSet} for the currently active key. The caller must
+ * ensure that the correct key is set in the state backend.
+ */
+ @SuppressWarnings("unchecked")
+ protected MergingWindowSet<W> getMergingWindowSet() throws Exception {
+ MergingWindowSet<W> mergingWindows = mergingWindowsByKey.get((K) getStateBackend().getCurrentKey());
+ if (mergingWindows == null) {
+ // try to retrieve from state
+
+ TupleSerializer<Tuple2<W, W>> tupleSerializer = new TupleSerializer<>((Class) Tuple2.class, new TypeSerializer[] {windowSerializer, windowSerializer} );
+ ListStateDescriptor<Tuple2<W, W>> mergeStateDescriptor = new ListStateDescriptor<>("merging-window-set", tupleSerializer);
+ ListState<Tuple2<W, W>> mergeState = getStateBackend().getPartitionedState(null, VoidSerializer.INSTANCE, mergeStateDescriptor);
+
+ mergingWindows = new MergingWindowSet<>((MergingWindowAssigner<? super IN, W>) windowAssigner, mergeState);
+ mergeState.clear();
+
+ mergingWindowsByKey.put((K) getStateBackend().getCurrentKey(), mergingWindows);
+ }
+ return mergingWindows;
+ }
+
+
+ /**
+ * Process {@link TriggerResult} for the currently active key and the given window. The caller
+ * must ensure that the correct key is set in the state backend and the context object.
+ */
@SuppressWarnings("unchecked")
- protected void processTriggerResult(TriggerResult triggerResult, K key, W window) throws Exception {
+ protected void processTriggerResult(TriggerResult triggerResult, W window) throws Exception {
if (!triggerResult.isFire() && !triggerResult.isPurge()) {
// do nothing
return;
@@ -318,7 +365,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
MergingWindowSet<W> mergingWindows = null;
if (windowAssigner instanceof MergingWindowAssigner) {
- mergingWindows = mergingWindowsByKey.get(key);
+ mergingWindows = getMergingWindowSet();
W stateWindow = mergingWindows.getStateWindow(window);
windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
@@ -366,7 +413,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
context.window = timer.window;
setKeyContext(timer.key);
TriggerResult triggerResult = context.onEventTime(timer.timestamp);
- processTriggerResult(triggerResult, context.key, context.window);
+ processTriggerResult(triggerResult, context.window);
} else {
fire = false;
}
@@ -389,7 +436,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
context.window = timer.window;
setKeyContext(timer.key);
TriggerResult triggerResult = context.onProcessingTime(timer.timestamp);
- processTriggerResult(triggerResult, context.key, context.window);
+ processTriggerResult(triggerResult, context.window);
} else {
fire = false;
}
@@ -604,7 +651,20 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
// ------------------------------------------------------------------------
@Override
+ @SuppressWarnings("unchecked")
public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
+
+ if (mergingWindowsByKey != null) {
+ TupleSerializer<Tuple2<W, W>> tupleSerializer = new TupleSerializer<>((Class) Tuple2.class, new TypeSerializer[] {windowSerializer, windowSerializer} );
+ ListStateDescriptor<Tuple2<W, W>> mergeStateDescriptor = new ListStateDescriptor<>("merging-window-set", tupleSerializer);
+ for (Map.Entry<K, MergingWindowSet<W>> key: mergingWindowsByKey.entrySet()) {
+ setKeyContext(key.getKey());
+ ListState<Tuple2<W, W>> mergeState = getStateBackend().getPartitionedState(null, VoidSerializer.INSTANCE, mergeStateDescriptor);
+ mergeState.clear();
+ key.getValue().persist(mergeState);
+ }
+ }
+
StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
AbstractStateBackend.CheckpointStateOutputView out =
http://git-wip-us.apache.org/repos/asf/flink/blob/f2f5bd5b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index 642a16b..233131d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -18,7 +18,6 @@
package org.apache.flink.streaming.runtime.operators.windowing;
import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
@@ -36,22 +35,23 @@ import org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunct
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
+import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.Collector;
@@ -109,6 +109,13 @@ public class WindowOperatorTest {
expectedOutput.add(new Watermark(2999));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
+ // do a snapshot, close and restore again
+ StreamTaskState snapshot = testHarness.snapshot(0L, 0L);
+ testHarness.close();
+ testHarness.setup();
+ testHarness.restore(snapshot, 10L);
+ testHarness.open();
+
testHarness.processWatermark(new Watermark(initialTime + 3999));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 5), initialTime + 3999));
expectedOutput.add(new Watermark(3999));
@@ -145,8 +152,8 @@ public class WindowOperatorTest {
TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>("window-contents",
- new SumReducer(),
- inputType.createSerializer(new ExecutionConfig()));
+ new SumReducer(),
+ inputType.createSerializer(new ExecutionConfig()));
WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
SlidingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)),
@@ -164,6 +171,7 @@ public class WindowOperatorTest {
testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+ testHarness.setup();
testHarness.open();
testSlidingEventTimeWindows(testHarness);
@@ -182,21 +190,21 @@ public class WindowOperatorTest {
TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
ListStateDescriptor<Tuple2<String, Integer>> stateDesc = new ListStateDescriptor<>("window-contents",
- inputType.createSerializer(new ExecutionConfig()));
+ inputType.createSerializer(new ExecutionConfig()));
WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
- SlidingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)),
- new TimeWindow.Serializer(),
- new TupleKeySelector(),
- BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
- stateDesc,
- new InternalIterableWindowFunction<>(new RichSumReducer<TimeWindow>()),
- EventTimeTrigger.create());
+ SlidingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)),
+ new TimeWindow.Serializer(),
+ new TupleKeySelector(),
+ BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+ stateDesc,
+ new InternalIterableWindowFunction<>(new RichSumReducer<TimeWindow>()),
+ EventTimeTrigger.create());
operator.setInputType(inputType, new ExecutionConfig());
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
- new OneInputStreamOperatorTestHarness<>(operator);
+ new OneInputStreamOperatorTestHarness<>(operator);
testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -206,7 +214,8 @@ public class WindowOperatorTest {
testHarness.close();
- Assert.assertEquals("Close was not called.", 1, closeCalled.get());
+ // we close once in the rest...
+ Assert.assertEquals("Close was not called.", 2, closeCalled.get());
}
private void testTumblingEventTimeWindows(OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness) throws Exception {
@@ -237,6 +246,13 @@ public class WindowOperatorTest {
expectedOutput.add(new Watermark(1999));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
+ // do a snapshot, close and restore again
+ StreamTaskState snapshot = testHarness.snapshot(0L, 0L);
+ testHarness.close();
+ testHarness.setup();
+ testHarness.restore(snapshot, 10L);
+ testHarness.open();
+
testHarness.processWatermark(new Watermark(initialTime + 2999));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), initialTime + 2999));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), initialTime + 2999));
@@ -266,7 +282,7 @@ public class WindowOperatorTest {
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
}
- @Test
+ @Test
@SuppressWarnings("unchecked")
public void testTumblingEventTimeWindowsReduce() throws Exception {
closeCalled.set(0);
@@ -276,8 +292,8 @@ public class WindowOperatorTest {
TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>("window-contents",
- new SumReducer(),
- inputType.createSerializer(new ExecutionConfig()));
+ new SumReducer(),
+ inputType.createSerializer(new ExecutionConfig()));
WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
@@ -312,21 +328,21 @@ public class WindowOperatorTest {
TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
ListStateDescriptor<Tuple2<String, Integer>> stateDesc = new ListStateDescriptor<>("window-contents",
- inputType.createSerializer(new ExecutionConfig()));
+ inputType.createSerializer(new ExecutionConfig()));
WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
- TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
- new TimeWindow.Serializer(),
- new TupleKeySelector(),
- BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
- stateDesc,
- new InternalIterableWindowFunction<>(new RichSumReducer<TimeWindow>()),
- EventTimeTrigger.create());
+ TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+ new TimeWindow.Serializer(),
+ new TupleKeySelector(),
+ BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+ stateDesc,
+ new InternalIterableWindowFunction<>(new RichSumReducer<TimeWindow>()),
+ EventTimeTrigger.create());
operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
- new OneInputStreamOperatorTestHarness<>(operator);
+ new OneInputStreamOperatorTestHarness<>(operator);
testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -336,7 +352,8 @@ public class WindowOperatorTest {
testHarness.close();
- Assert.assertEquals("Close was not called.", 1, closeCalled.get());
+ // we close once in the rest...
+ Assert.assertEquals("Close was not called.", 2, closeCalled.get());
}
@Test
@@ -379,6 +396,14 @@ public class WindowOperatorTest {
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 10));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), initialTime + 1000));
+
+ // do a snapshot, close and restore again
+ StreamTaskState snapshot = testHarness.snapshot(0L, 0L);
+ testHarness.close();
+ testHarness.setup();
+ testHarness.restore(snapshot, 10L);
+ testHarness.open();
+
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), initialTime + 2500));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 4), initialTime + 5501));
@@ -390,6 +415,7 @@ public class WindowOperatorTest {
expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-6", 10L, 5500L), initialTime + 5499));
expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-6", 0L, 5500L), initialTime + 5499));
+
expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-20", 5501L, 9050L), initialTime + 9049));
expectedOutput.add(new Watermark(initialTime + 12000));
@@ -444,6 +470,13 @@ public class WindowOperatorTest {
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), initialTime + 1000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), initialTime + 2500));
+ // do a snapshot, close and restore again
+ StreamTaskState snapshot = testHarness.snapshot(0L, 0L);
+ testHarness.close();
+ testHarness.setup();
+ testHarness.restore(snapshot, 10L);
+ testHarness.open();
+
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 10));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), initialTime + 1000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), initialTime + 2500));
@@ -518,6 +551,14 @@ public class WindowOperatorTest {
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 10));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), initialTime + 1000));
+
+ // do a snapshot, close and restore again
+ StreamTaskState snapshot = testHarness.snapshot(0L, 0L);
+ testHarness.close();
+ testHarness.setup();
+ testHarness.restore(snapshot, 10L);
+ testHarness.open();
+
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), initialTime + 2500));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 6000));
@@ -610,6 +651,14 @@ public class WindowOperatorTest {
// add elements out-of-order
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 0));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 33), initialTime + 1000));
+
+ // do a snapshot, close and restore again
+ StreamTaskState snapshot = testHarness.snapshot(0L, 0L);
+ testHarness.close();
+ testHarness.setup();
+ testHarness.restore(snapshot, 10L);
+ testHarness.open();
+
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 33), initialTime + 2500));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 10));
@@ -637,8 +686,8 @@ public class WindowOperatorTest {
TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>("window-contents",
- new SumReducer(),
- inputType.createSerializer(new ExecutionConfig()));
+ new SumReducer(),
+ inputType.createSerializer(new ExecutionConfig()));
WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new WindowOperator<>(
GlobalWindows.create(),
@@ -727,8 +776,8 @@ public class WindowOperatorTest {
TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>("window-contents",
- new SumReducer(),
- inputType.createSerializer(new ExecutionConfig()));
+ new SumReducer(),
+ inputType.createSerializer(new ExecutionConfig()));
WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new WindowOperator<>(
GlobalWindows.create(),
@@ -764,6 +813,14 @@ public class WindowOperatorTest {
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1998));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1999));
+
+ // do a snapshot, close and restore again
+ StreamTaskState snapshot = testHarness.snapshot(0L, 0L);
+ testHarness.close();
+ testHarness.setup();
+ testHarness.restore(snapshot, 10L);
+ testHarness.open();
+
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
@@ -793,7 +850,7 @@ public class WindowOperatorTest {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1,
- Tuple2<String, Integer> value2) throws Exception {
+ Tuple2<String, Integer> value2) throws Exception {
return new Tuple2<>(value2.f0, value1.f1 + value2.f1);
}
}
@@ -818,9 +875,9 @@ public class WindowOperatorTest {
@Override
public void apply(String key,
- W window,
- Iterable<Tuple2<String, Integer>> input,
- Collector<Tuple2<String, Integer>> out) throws Exception {
+ W window,
+ Iterable<Tuple2<String, Integer>> input,
+ Collector<Tuple2<String, Integer>> out) throws Exception {
if (!openCalled) {
fail("Open was not called");
http://git-wip-us.apache.org/repos/asf/flink/blob/f2f5bd5b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index 46e74e7..c5f983a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -36,6 +36,7 @@ import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -69,6 +70,11 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
final Object checkpointLock;
StreamTask<?, ?> mockTask;
+
+ /**
+ * Whether setup() was called on the operator. This is reset when calling close().
+ */
+ private boolean setupCalled = false;
public OneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> operator) {
@@ -151,20 +157,47 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
}
/**
- * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#open()}
+ * Calls
+ * {@link org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, StreamConfig, Output)} ()}
*/
- public void open() throws Exception {
+ public void setup() throws Exception {
operator.setup(mockTask, config, new MockOutput());
+ setupCalled = true;
+ }
+ /**
+ * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#open()}. This also
+ * calls {@link org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, StreamConfig, Output)}
+ * if it was not called before.
+ */
+ public void open() throws Exception {
+ if (!setupCalled) {
+ setup();
+ }
operator.open();
}
/**
+ * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#snapshotOperatorState(long, long)} ()}
+ */
+ public StreamTaskState snapshot(long checkpointId, long timestamp) throws Exception {
+ return operator.snapshotOperatorState(checkpointId, timestamp);
+ }
+
+ /**
+ * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#restoreState(StreamTaskState, long)} ()}
+ */
+ public void restore(StreamTaskState snapshot, long recoveryTimestamp) throws Exception {
+ operator.restoreState(snapshot, recoveryTimestamp);
+ }
+
+ /**
* Calls close and dispose on the operator.
*/
public void close() throws Exception {
operator.close();
operator.dispose();
+ setupCalled = false;
}
public void processElement(StreamRecord<IN> element) throws Exception {