You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/07/20 13:58:17 UTC
flink git commit: [FLINK-9902][tests] Improve and refactor window
checkpointing IT cases
Repository: flink
Updated Branches:
refs/heads/master 01cf808ee -> d309e61e2
[FLINK-9902][tests] Improve and refactor window checkpointing IT cases
This closes #6376.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d309e61e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d309e61e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d309e61e
Branch: refs/heads/master
Commit: d309e61e2bae170872b43cf60bd2fd9fef77814c
Parents: 01cf808
Author: Stefan Richter <s....@data-artisans.com>
Authored: Wed Jul 18 23:29:56 2018 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Fri Jul 20 15:57:49 2018 +0200
----------------------------------------------------------------------
.../EventTimeAllWindowCheckpointingITCase.java | 296 ++-------------
.../EventTimeWindowCheckpointingITCase.java | 375 +++++--------------
.../WindowCheckpointingITCase.java | 229 +++--------
.../test/checkpointing/utils/FailingSource.java | 155 ++++++++
.../flink/test/checkpointing/utils/IntType.java | 38 ++
.../checkpointing/utils/ValidatingSink.java | 128 +++++++
.../flink/test/util/SuccessException.java | 2 +-
7 files changed, 505 insertions(+), 718 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d309e61e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
index 9e14b26..5dc2aa0 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
@@ -26,31 +26,23 @@ import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction;
-import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.test.checkpointing.utils.FailingSource;
+import org.apache.flink.test.checkpointing.utils.IntType;
+import org.apache.flink.test.checkpointing.utils.ValidatingSink;
import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.test.util.MiniClusterResourceConfiguration;
-import org.apache.flink.test.util.SuccessException;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
import org.junit.ClassRule;
import org.junit.Test;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-
import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static org.apache.flink.test.util.TestUtils.tryExecute;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -88,20 +80,17 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
final int numElementsPerKey = 3000;
final int windowSize = 100;
final int numKeys = 1;
- FailingSource.reset();
try {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(PARALLELISM);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(100);
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
env.getConfig().disableSysoutLogging();
env
- .addSource(new FailingSource(numKeys,
- numElementsPerKey,
- numElementsPerKey / 3))
+ .addSource(new FailingSource(new EventTimeWindowCheckpointingITCase.KeyedEventTimeGenerator(numKeys, windowSize), numElementsPerKey))
.rebalance()
.timeWindowAll(Time.of(windowSize, MILLISECONDS))
.apply(new RichAllWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, TimeWindow>() {
@@ -133,9 +122,12 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
out.collect(new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum)));
}
})
- .addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSize)).setParallelism(1);
+ .addSink(new ValidatingSink<>(
+ new EventTimeWindowCheckpointingITCase.SinkValidatorUpdateFun(numElementsPerKey),
+ new EventTimeWindowCheckpointingITCase.SinkValidatorCheckFun(numKeys, numElementsPerKey, windowSize)))
+ .setParallelism(1);
- tryExecute(env, "Tumbling Window Test");
+ env.execute("Tumbling Window Test");
}
catch (Exception e) {
e.printStackTrace();
@@ -149,7 +141,6 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
final int windowSize = 1000;
final int windowSlide = 100;
final int numKeys = 1;
- FailingSource.reset();
try {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
@@ -160,7 +151,7 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
env.getConfig().disableSysoutLogging();
env
- .addSource(new FailingSource(numKeys, numElementsPerKey, numElementsPerKey / 3))
+ .addSource(new FailingSource(new EventTimeWindowCheckpointingITCase.KeyedEventTimeGenerator(numKeys, windowSlide), numElementsPerKey))
.rebalance()
.timeWindowAll(Time.of(windowSize, MILLISECONDS), Time.of(windowSlide, MILLISECONDS))
.apply(new RichAllWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, TimeWindow>() {
@@ -192,9 +183,12 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
out.collect(new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum)));
}
})
- .addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSlide)).setParallelism(1);
+ .addSink(new ValidatingSink<>(
+ new EventTimeWindowCheckpointingITCase.SinkValidatorUpdateFun(numElementsPerKey),
+ new EventTimeWindowCheckpointingITCase.SinkValidatorCheckFun(numKeys, numElementsPerKey, windowSlide)))
+ .setParallelism(1);
- tryExecute(env, "Sliding Window Test");
+ env.execute("Sliding Window Test");
}
catch (Exception e) {
e.printStackTrace();
@@ -207,20 +201,17 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
final int numElementsPerKey = 3000;
final int windowSize = 100;
final int numKeys = 1;
- FailingSource.reset();
try {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(PARALLELISM);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(100);
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
env.getConfig().disableSysoutLogging();
env
- .addSource(new FailingSource(numKeys,
- numElementsPerKey,
- numElementsPerKey / 3))
+ .addSource(new FailingSource(new EventTimeWindowCheckpointingITCase.KeyedEventTimeGenerator(numKeys, windowSize), numElementsPerKey))
.rebalance()
.timeWindowAll(Time.of(windowSize, MILLISECONDS))
.reduce(
@@ -261,9 +252,12 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
}
}
})
- .addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSize)).setParallelism(1);
+ .addSink(new ValidatingSink<>(
+ new EventTimeWindowCheckpointingITCase.SinkValidatorUpdateFun(numElementsPerKey),
+ new EventTimeWindowCheckpointingITCase.SinkValidatorCheckFun(numKeys, numElementsPerKey, windowSize)))
+ .setParallelism(1);
- tryExecute(env, "Tumbling Window Test");
+ env.execute("Tumbling Window Test");
}
catch (Exception e) {
e.printStackTrace();
@@ -276,20 +270,17 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
final int numElementsPerKey = 3000;
final int windowSize = 100;
final int numKeys = 1;
- FailingSource.reset();
try {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(PARALLELISM);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(100);
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
env.getConfig().disableSysoutLogging();
env
- .addSource(new FailingSource(numKeys,
- numElementsPerKey,
- numElementsPerKey / 3))
+ .addSource(new FailingSource(new EventTimeWindowCheckpointingITCase.KeyedEventTimeGenerator(numKeys, windowSize), numElementsPerKey))
.rebalance()
.timeWindowAll(Time.of(windowSize, MILLISECONDS))
.fold(new Tuple4<>(0L, 0L, 0L, new IntType(0)),
@@ -329,9 +320,12 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
}
}
})
- .addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSize)).setParallelism(1);
+ .addSink(new ValidatingSink<>(
+ new EventTimeWindowCheckpointingITCase.SinkValidatorUpdateFun(numElementsPerKey),
+ new EventTimeWindowCheckpointingITCase.SinkValidatorCheckFun(numKeys, numElementsPerKey, windowSize)))
+ .setParallelism(1);
- tryExecute(env, "Tumbling Window Test");
+ env.execute("Tumbling Window Test");
}
catch (Exception e) {
e.printStackTrace();
@@ -345,20 +339,17 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
final int windowSize = 1000;
final int windowSlide = 100;
final int numKeys = 1;
- FailingSource.reset();
try {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(PARALLELISM);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(100);
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
env.getConfig().disableSysoutLogging();
env
- .addSource(new FailingSource(numKeys,
- numElementsPerKey,
- numElementsPerKey / 3))
+ .addSource(new FailingSource(new EventTimeWindowCheckpointingITCase.KeyedEventTimeGenerator(numKeys, windowSlide), numElementsPerKey))
.rebalance()
.timeWindowAll(Time.of(windowSize, MILLISECONDS),
Time.of(windowSlide, MILLISECONDS))
@@ -400,229 +391,16 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
}
}
})
- .addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSlide)).setParallelism(1);
+ .addSink(new ValidatingSink<>(
+ new EventTimeWindowCheckpointingITCase.SinkValidatorUpdateFun(numElementsPerKey),
+ new EventTimeWindowCheckpointingITCase.SinkValidatorCheckFun(numKeys, numElementsPerKey, windowSlide)))
+ .setParallelism(1);
- tryExecute(env, "Tumbling Window Test");
+ env.execute("Tumbling Window Test");
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
-
- // ------------------------------------------------------------------------
- // Utilities
- // ------------------------------------------------------------------------
-
- private static class FailingSource extends RichSourceFunction<Tuple2<Long, IntType>>
- implements ListCheckpointed<Integer>, CheckpointListener {
- private static volatile boolean failedBefore = false;
-
- private final int numKeys;
- private final int numElementsToEmit;
- private final int failureAfterNumElements;
-
- private volatile int numElementsEmitted;
- private volatile int numSuccessfulCheckpoints;
- private volatile boolean running = true;
-
- private FailingSource(int numKeys, int numElementsToEmitPerKey, int failureAfterNumElements) {
- this.numKeys = numKeys;
- this.numElementsToEmit = numElementsToEmitPerKey;
- this.failureAfterNumElements = failureAfterNumElements;
- }
-
- @Override
- public void open(Configuration parameters) {
- // non-parallel source
- assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
- }
-
- @Override
- public void run(SourceContext<Tuple2<Long, IntType>> ctx) throws Exception {
- // we loop longer than we have elements, to permit delayed checkpoints
- // to still cause a failure
- while (running) {
-
- if (!failedBefore) {
- // delay a bit, if we have not failed before
- Thread.sleep(1);
- if (numSuccessfulCheckpoints >= 2 && numElementsEmitted >= failureAfterNumElements) {
- // cause a failure if we have not failed before and have reached
- // enough completed checkpoints and elements
- failedBefore = true;
- throw new Exception("Artificial Failure");
- }
- }
-
- if (numElementsEmitted < numElementsToEmit &&
- (failedBefore || numElementsEmitted <= failureAfterNumElements)) {
- // the function failed before, or we are in the elements before the failure
- synchronized (ctx.getCheckpointLock()) {
- int next = numElementsEmitted++;
- for (long i = 0; i < numKeys; i++) {
- ctx.collectWithTimestamp(new Tuple2<>(i, new IntType(next)), next);
- }
- ctx.emitWatermark(new Watermark(next));
- }
- }
- else {
- // if our work is done, delay a bit to prevent busy waiting
- Thread.sleep(1);
- }
- }
- }
-
- @Override
- public void cancel() {
- running = false;
- }
-
- @Override
- public void notifyCheckpointComplete(long checkpointId) {
- numSuccessfulCheckpoints++;
- }
-
- public static void reset() {
- failedBefore = false;
- }
-
- @Override
- public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
- return Collections.singletonList(this.numElementsEmitted);
- }
-
- @Override
- public void restoreState(List<Integer> state) throws Exception {
- if (state.isEmpty() || state.size() > 1) {
- throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
- }
- this.numElementsEmitted = state.get(0);
- }
- }
-
- private static class ValidatingSink extends RichSinkFunction<Tuple4<Long, Long, Long, IntType>>
- implements ListCheckpointed<HashMap<Long, Integer>> {
-
- private final HashMap<Long, Integer> windowCounts = new HashMap<>();
-
- private final int numKeys;
- private final int numWindowsExpected;
-
- private ValidatingSink(int numKeys, int numWindowsExpected) {
- this.numKeys = numKeys;
- this.numWindowsExpected = numWindowsExpected;
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- // this sink can only work with DOP 1
- assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
-
- // it can happen that a checkpoint happens when the complete success state is
- // already set. In that case we restart with the final state and would never
- // finish because no more elements arrive.
- if (windowCounts.size() == numKeys) {
- boolean seenAll = true;
- for (Integer windowCount: windowCounts.values()) {
- if (windowCount != numWindowsExpected) {
- seenAll = false;
- break;
- }
- }
- if (seenAll) {
- throw new SuccessException();
- }
- }
- }
-
- @Override
- public void close() throws Exception {
- boolean seenAll = true;
- if (windowCounts.size() == numKeys) {
- for (Integer windowCount: windowCounts.values()) {
- if (windowCount != numWindowsExpected) {
- seenAll = false;
- break;
- }
- }
- }
- assertTrue("The source must see all expected windows.", seenAll);
- }
-
- @Override
- public void invoke(Tuple4<Long, Long, Long, IntType> value) throws Exception {
-
- // verify the contents of that window, Tuple4.f1 and .f2 are the window start/end
- // the sum should be "sum (start .. end-1)"
-
- int expectedSum = 0;
- for (long i = value.f1; i < value.f2; i++) {
- // only sum up positive vals, to filter out the negative start of the
- // first sliding windows
- if (i > 0) {
- expectedSum += i;
- }
- }
-
- assertEquals("Window start: " + value.f1 + " end: " + value.f2, expectedSum, value.f3.value);
-
- Integer curr = windowCounts.get(value.f0);
- if (curr != null) {
- windowCounts.put(value.f0, curr + 1);
- }
- else {
- windowCounts.put(value.f0, 1);
- }
-
- boolean seenAll = true;
- if (windowCounts.size() == numKeys) {
- for (Integer windowCount: windowCounts.values()) {
- if (windowCount < numWindowsExpected) {
- seenAll = false;
- break;
- } else if (windowCount > numWindowsExpected) {
- fail("Window count to high: " + windowCount);
- }
- }
-
- if (seenAll) {
- // exit
- throw new SuccessException();
- }
-
- }
- }
-
- @Override
- public List<HashMap<Long, Integer>> snapshotState(long checkpointId, long timestamp) throws Exception {
- return Collections.singletonList(this.windowCounts);
- }
-
- @Override
- public void restoreState(List<HashMap<Long, Integer>> state) throws Exception {
- if (state.isEmpty() || state.size() > 1) {
- throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
- }
- this.windowCounts.putAll(state.get(0));
- }
- }
-
- // ------------------------------------------------------------------------
- // Utilities
- // ------------------------------------------------------------------------
-
- /**
- * Custom boxed integer type.
- */
- public static class IntType {
-
- public int value;
-
- public IntType() {}
-
- public IntType(int value) {
- this.value = value;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d309e61e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
index c3d93d7..e9a2e45 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
@@ -32,21 +32,20 @@ import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.test.checkpointing.utils.FailingSource;
+import org.apache.flink.test.checkpointing.utils.IntType;
+import org.apache.flink.test.checkpointing.utils.ValidatingSink;
import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.test.util.MiniClusterResourceConfiguration;
-import org.apache.flink.test.util.SuccessException;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
@@ -65,19 +64,10 @@ import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
+import java.util.Map;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.FILE;
-import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.FILE_ASYNC;
-import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.MEM;
-import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.MEM_ASYNC;
-import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_FULLY_ASYNC;
-import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_INCREMENTAL;
import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_INCREMENTAL_ZK;
-import static org.apache.flink.test.util.TestUtils.tryExecute;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -255,20 +245,19 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger {
final int numElementsPerKey = numElementsPerKey();
final int windowSize = windowSize();
final int numKeys = numKeys();
- FailingSource.reset();
try {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(PARALLELISM);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(100);
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
env.getConfig().disableSysoutLogging();
env.setStateBackend(this.stateBackend);
env.getConfig().setUseSnapshotCompression(true);
env
- .addSource(new FailingSource(numKeys, numElementsPerKey, numElementsPerKey / 3))
+ .addSource(new FailingSource(new KeyedEventTimeGenerator(numKeys, windowSize), numElementsPerKey))
.rebalance()
.keyBy(0)
.timeWindow(Time.of(windowSize, MILLISECONDS))
@@ -299,12 +288,17 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger {
sum += value.f1.value;
key = value.f0;
}
- out.collect(new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum)));
+
+ final Tuple4<Long, Long, Long, IntType> result =
+ new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum));
+ out.collect(result);
}
})
- .addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSize)).setParallelism(1);
+ .addSink(new ValidatingSink<>(
+ new SinkValidatorUpdateFun(numElementsPerKey),
+ new SinkValidatorCheckFun(numKeys, numElementsPerKey, windowSize))).setParallelism(1);
- tryExecute(env, "Tumbling Window Test");
+ env.execute("Tumbling Window Test");
}
catch (Exception e) {
e.printStackTrace();
@@ -326,7 +320,6 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger {
final int numElementsPerKey = numElementsPerKey();
final int windowSize = windowSize();
final int numKeys = numKeys();
- FailingSource.reset();
try {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
@@ -334,13 +327,13 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger {
env.setMaxParallelism(maxParallelism);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(100);
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
env.getConfig().disableSysoutLogging();
env.setStateBackend(this.stateBackend);
env.getConfig().setUseSnapshotCompression(true);
env
- .addSource(new FailingSource(numKeys, numElementsPerKey, numElementsPerKey / 3))
+ .addSource(new FailingSource(new KeyedEventTimeGenerator(numKeys, windowSize), numElementsPerKey))
.rebalance()
.keyBy(0)
.timeWindow(Time.of(windowSize, MILLISECONDS))
@@ -378,9 +371,11 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger {
out.collect(new Tuple4<>(tuple.<Long>getField(0), window.getStart(), window.getEnd(), new IntType(count.value())));
}
})
- .addSink(new CountValidatingSink(numKeys, numElementsPerKey / windowSize)).setParallelism(1);
+ .addSink(new ValidatingSink<>(
+ new CountingSinkValidatorUpdateFun(),
+ new SinkValidatorCheckFun(numKeys, numElementsPerKey, windowSize))).setParallelism(1);
- tryExecute(env, "Tumbling Window Test");
+ env.execute("Tumbling Window Test");
}
catch (Exception e) {
e.printStackTrace();
@@ -394,7 +389,6 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger {
final int windowSize = windowSize();
final int windowSlide = windowSlide();
final int numKeys = numKeys();
- FailingSource.reset();
try {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
@@ -402,13 +396,13 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger {
env.setParallelism(PARALLELISM);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(100);
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
env.getConfig().disableSysoutLogging();
env.setStateBackend(this.stateBackend);
env.getConfig().setUseSnapshotCompression(true);
env
- .addSource(new FailingSource(numKeys, numElementsPerKey, numElementsPerKey / 3))
+ .addSource(new FailingSource(new KeyedEventTimeGenerator(numKeys, windowSlide), numElementsPerKey))
.rebalance()
.keyBy(0)
.timeWindow(Time.of(windowSize, MILLISECONDS), Time.of(windowSlide, MILLISECONDS))
@@ -439,12 +433,16 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger {
sum += value.f1.value;
key = value.f0;
}
- out.collect(new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum)));
+ final Tuple4<Long, Long, Long, IntType> output =
+ new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum));
+ out.collect(output);
}
})
- .addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSlide)).setParallelism(1);
+ .addSink(new ValidatingSink<>(
+ new SinkValidatorUpdateFun(numElementsPerKey),
+ new SinkValidatorCheckFun(numKeys, numElementsPerKey, windowSlide))).setParallelism(1);
- tryExecute(env, "Tumbling Window Test");
+ env.execute("Tumbling Window Test");
}
catch (Exception e) {
e.printStackTrace();
@@ -457,20 +455,19 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger {
final int numElementsPerKey = numElementsPerKey();
final int windowSize = windowSize();
final int numKeys = numKeys();
- FailingSource.reset();
try {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(PARALLELISM);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(100);
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
env.getConfig().disableSysoutLogging();
env.setStateBackend(this.stateBackend);
env.getConfig().setUseSnapshotCompression(true);
env
- .addSource(new FailingSource(numKeys, numElementsPerKey, numElementsPerKey / 3))
+ .addSource(new FailingSource(new KeyedEventTimeGenerator(numKeys, windowSize), numElementsPerKey))
.rebalance()
.keyBy(0)
.timeWindow(Time.of(windowSize, MILLISECONDS))
@@ -505,16 +502,19 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger {
assertTrue(open);
for (Tuple2<Long, IntType> in: input) {
- out.collect(new Tuple4<>(in.f0,
- window.getStart(),
- window.getEnd(),
- in.f1));
+ final Tuple4<Long, Long, Long, IntType> output = new Tuple4<>(in.f0,
+ window.getStart(),
+ window.getEnd(),
+ in.f1);
+ out.collect(output);
}
}
})
- .addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSize)).setParallelism(1);
+ .addSink(new ValidatingSink<>(
+ new SinkValidatorUpdateFun(numElementsPerKey),
+ new SinkValidatorCheckFun(numKeys, numElementsPerKey, windowSize))).setParallelism(1);
- tryExecute(env, "Tumbling Window Test");
+ env.execute("Tumbling Window Test");
}
catch (Exception e) {
e.printStackTrace();
@@ -528,20 +528,19 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger {
final int windowSize = windowSize();
final int windowSlide = windowSlide();
final int numKeys = numKeys();
- FailingSource.reset();
try {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(PARALLELISM);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(100);
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
env.getConfig().disableSysoutLogging();
env.setStateBackend(this.stateBackend);
env.getConfig().setUseSnapshotCompression(true);
env
- .addSource(new FailingSource(numKeys, numElementsPerKey, numElementsPerKey / 3))
+ .addSource(new FailingSource(new KeyedEventTimeGenerator(numKeys, windowSlide), numElementsPerKey))
.rebalance()
.keyBy(0)
.timeWindow(Time.of(windowSize, MILLISECONDS), Time.of(windowSlide, MILLISECONDS))
@@ -585,9 +584,11 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger {
}
}
})
- .addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSlide)).setParallelism(1);
+ .addSink(new ValidatingSink<>(
+ new SinkValidatorUpdateFun(numElementsPerKey),
+ new SinkValidatorCheckFun(numKeys, numElementsPerKey, windowSlide))).setParallelism(1);
- tryExecute(env, "Tumbling Window Test");
+ env.execute("Tumbling Window Test");
}
catch (Exception e) {
e.printStackTrace();
@@ -599,151 +600,42 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger {
// Utilities
// ------------------------------------------------------------------------
- private static class FailingSource extends RichSourceFunction<Tuple2<Long, IntType>>
- implements ListCheckpointed<Integer>, CheckpointListener {
- private static volatile boolean failedBefore = false;
-
- private final int numKeys;
- private final int numElementsToEmit;
- private final int failureAfterNumElements;
-
- private volatile int numElementsEmitted;
- private volatile int numSuccessfulCheckpoints;
- private volatile boolean running = true;
-
- private FailingSource(int numKeys, int numElementsToEmitPerKey, int failureAfterNumElements) {
- this.numKeys = numKeys;
- this.numElementsToEmit = numElementsToEmitPerKey;
- this.failureAfterNumElements = failureAfterNumElements;
- }
-
- @Override
- public void open(Configuration parameters) {
- // non-parallel source
- assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
- }
-
- @Override
- public void run(SourceContext<Tuple2<Long, IntType>> ctx) throws Exception {
- // we loop longer than we have elements, to permit delayed checkpoints
- // to still cause a failure
- while (running) {
-
- if (!failedBefore) {
- // delay a bit, if we have not failed before
- Thread.sleep(1);
- if (numSuccessfulCheckpoints >= 2 && numElementsEmitted >= failureAfterNumElements) {
- // cause a failure if we have not failed before and have reached
- // enough completed checkpoints and elements
- failedBefore = true;
- throw new Exception("Artificial Failure");
- }
- }
-
- if (numElementsEmitted < numElementsToEmit &&
- (failedBefore || numElementsEmitted <= failureAfterNumElements)) {
- // the function failed before, or we are in the elements before the failure
- synchronized (ctx.getCheckpointLock()) {
- int next = numElementsEmitted++;
- for (long i = 0; i < numKeys; i++) {
- ctx.collectWithTimestamp(new Tuple2<Long, IntType>(i, new IntType(next)), next);
- }
- ctx.emitWatermark(new Watermark(next));
- }
- }
- else {
-
- // if our work is done, delay a bit to prevent busy waiting
- Thread.sleep(1);
- }
- }
- }
-
- @Override
- public void cancel() {
- running = false;
- }
-
- @Override
- public void notifyCheckpointComplete(long checkpointId) {
- numSuccessfulCheckpoints++;
- }
+ /**
+ * For validating the stateful window counts.
+ */
+ static class CountingSinkValidatorUpdateFun
+ implements ValidatingSink.CountUpdater<Tuple4<Long, Long, Long, IntType>> {
@Override
- public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
- return Collections.singletonList(this.numElementsEmitted);
- }
+ public void updateCount(Tuple4<Long, Long, Long, IntType> value, Map<Long, Integer> windowCounts) {
- @Override
- public void restoreState(List<Integer> state) throws Exception {
- if (state.isEmpty() || state.size() > 1) {
- throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
- }
- this.numElementsEmitted = state.get(0);
- }
+ windowCounts.merge(value.f0, 1, (a, b) -> a + b);
- public static void reset() {
- failedBefore = false;
+ // verify the contents of that window, the contents should be:
+ // (key + num windows so far)
+ assertEquals("Window counts don't match for key " + value.f0 + ".", value.f0.intValue() + windowCounts.get(value.f0), value.f3.value);
}
}
- private static class ValidatingSink extends RichSinkFunction<Tuple4<Long, Long, Long, IntType>>
- implements ListCheckpointed<HashMap<Long, Integer>> {
-
- private final HashMap<Long, Integer> windowCounts = new HashMap<>();
-
- private final int numKeys;
- private final int numWindowsExpected;
+ //------------------------------------
- private ValidatingSink(int numKeys, int numWindowsExpected) {
- this.numKeys = numKeys;
- this.numWindowsExpected = numWindowsExpected;
- }
+ static class SinkValidatorUpdateFun implements ValidatingSink.CountUpdater<Tuple4<Long, Long, Long, IntType>> {
- @Override
- public void open(Configuration parameters) throws Exception {
- // this sink can only work with DOP 1
- assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
+ private final int elementsPerKey;
- // it can happen that a checkpoint happens when the complete success state is
- // already set. In that case we restart with the final state and would never
- // finish because no more elements arrive.
- if (windowCounts.size() == numKeys) {
- boolean seenAll = true;
- for (Integer windowCount: windowCounts.values()) {
- if (windowCount != numWindowsExpected) {
- seenAll = false;
- break;
- }
- }
- if (seenAll) {
- throw new SuccessException();
- }
- }
+ SinkValidatorUpdateFun(int elementsPerKey) {
+ this.elementsPerKey = elementsPerKey;
}
@Override
- public void close() throws Exception {
- boolean seenAll = true;
- if (windowCounts.size() == numKeys) {
- for (Integer windowCount: windowCounts.values()) {
- if (windowCount < numWindowsExpected) {
- seenAll = false;
- break;
- }
- }
- }
- assertTrue("The sink must see all expected windows.", seenAll);
- }
-
- @Override
- public void invoke(Tuple4<Long, Long, Long, IntType> value) throws Exception {
-
+ public void updateCount(Tuple4<Long, Long, Long, IntType> value, Map<Long, Integer> windowCounts) {
// verify the contents of that window, Tuple4.f1 and .f2 are the window start/end
// the sum should be "sum (start .. end-1)"
int expectedSum = 0;
- for (long i = value.f1; i < value.f2; i++) {
+ // we shorten the range if it goes beyond elementsPerKey, because those are "incomplete" sliding windows
+ long countUntil = Math.min(value.f2, elementsPerKey);
+ for (long i = value.f1; i < countUntil; i++) {
// only sum up positive vals, to filter out the negative start of the
// first sliding windows
if (i > 0) {
@@ -753,142 +645,55 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger {
assertEquals("Window start: " + value.f1 + " end: " + value.f2, expectedSum, value.f3.value);
- Integer curr = windowCounts.get(value.f0);
- if (curr != null) {
- windowCounts.put(value.f0, curr + 1);
- }
- else {
- windowCounts.put(value.f0, 1);
- }
-
- if (windowCounts.size() == numKeys) {
- boolean seenAll = true;
- for (Integer windowCount: windowCounts.values()) {
- if (windowCount < numWindowsExpected) {
- seenAll = false;
- break;
- } else if (windowCount > numWindowsExpected) {
- fail("Window count to high: " + windowCount);
- }
- }
-
- if (seenAll) {
- // exit
- throw new SuccessException();
- }
-
- }
- }
-
- @Override
- public List<HashMap<Long, Integer>> snapshotState(long checkpointId, long timestamp) throws Exception {
- return Collections.singletonList(this.windowCounts);
- }
-
- @Override
- public void restoreState(List<HashMap<Long, Integer>> state) throws Exception {
- if (state.isEmpty() || state.size() > 1) {
- throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
- }
- windowCounts.putAll(state.get(0));
+ windowCounts.merge(value.f0, 1, (val, increment) -> val + increment);
}
}
- // Sink for validating the stateful window counts
- private static class CountValidatingSink extends RichSinkFunction<Tuple4<Long, Long, Long, IntType>>
- implements ListCheckpointed<HashMap<Long, Integer>> {
-
- private final HashMap<Long, Integer> windowCounts = new HashMap<>();
+ static class SinkValidatorCheckFun implements ValidatingSink.ResultChecker {
private final int numKeys;
private final int numWindowsExpected;
- private CountValidatingSink(int numKeys, int numWindowsExpected) {
+ SinkValidatorCheckFun(int numKeys, int elementsPerKey, int elementsPerWindow) {
this.numKeys = numKeys;
- this.numWindowsExpected = numWindowsExpected;
+ this.numWindowsExpected = elementsPerKey / elementsPerWindow;
}
@Override
- public void open(Configuration parameters) throws Exception {
- // this sink can only work with DOP 1
- assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
- }
-
- @Override
- public void close() throws Exception {
- boolean seenAll = true;
+ public boolean checkResult(Map<Long, Integer> windowCounts) {
if (windowCounts.size() == numKeys) {
- for (Integer windowCount: windowCounts.values()) {
+ for (Integer windowCount : windowCounts.values()) {
if (windowCount < numWindowsExpected) {
- seenAll = false;
- break;
+ return false;
}
}
+ return true;
}
- assertTrue("The source must see all expected windows.", seenAll);
+ return false;
}
+ }
- @Override
- public void invoke(Tuple4<Long, Long, Long, IntType> value) throws Exception {
-
- Integer curr = windowCounts.get(value.f0);
- if (curr != null) {
- windowCounts.put(value.f0, curr + 1);
- }
- else {
- windowCounts.put(value.f0, 1);
- }
-
- // verify the contents of that window, the contents should be:
- // (key + num windows so far)
-
- assertEquals("Window counts don't match for key " + value.f0 + ".", value.f0.intValue() + windowCounts.get(value.f0), value.f3.value);
-
- boolean seenAll = true;
- if (windowCounts.size() == numKeys) {
- for (Integer windowCount: windowCounts.values()) {
- if (windowCount < numWindowsExpected) {
- seenAll = false;
- break;
- } else if (windowCount > numWindowsExpected) {
- fail("Window count to high: " + windowCount);
- }
- }
+ static class KeyedEventTimeGenerator implements FailingSource.EventEmittingGenerator {
- if (seenAll) {
- // exit
- throw new SuccessException();
- }
+ private final int keyUniverseSize;
+ private final int watermarkTrailing;
- }
- }
-
- @Override
- public List<HashMap<Long, Integer>> snapshotState(long checkpointId, long timestamp) throws Exception {
- return Collections.singletonList(this.windowCounts);
+ public KeyedEventTimeGenerator(int keyUniverseSize, int numElementsPerWindow) {
+ this.keyUniverseSize = keyUniverseSize;
+ // we let the watermark a bit behind, so that there can be in-flight timers that required checkpointing
+ // to include correct timer snapshots in our testing.
+ this.watermarkTrailing = 4 * numElementsPerWindow / 3;
}
@Override
- public void restoreState(List<HashMap<Long, Integer>> state) throws Exception {
- if (state.isEmpty() || state.size() > 1) {
- throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+ public void emitEvent(SourceFunction.SourceContext<Tuple2<Long, IntType>> ctx, int eventSequenceNo) {
+ final IntType intTypeNext = new IntType(eventSequenceNo);
+ for (long i = 0; i < keyUniverseSize; i++) {
+ final Tuple2<Long, IntType> generatedEvent = new Tuple2<>(i, intTypeNext);
+ ctx.collectWithTimestamp(generatedEvent, eventSequenceNo);
}
- this.windowCounts.putAll(state.get(0));
- }
- }
-
- // ------------------------------------------------------------------------
- // Utilities
- // ------------------------------------------------------------------------
-
- private static class IntType {
-
- public int value;
-
- public IntType() {}
- public IntType(int value) {
- this.value = value;
+ ctx.emitWatermark(new Watermark(eventSequenceNo - watermarkTrailing));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d309e61e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
index b6163e8..b0e2967 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
@@ -25,18 +25,17 @@ import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.test.checkpointing.utils.FailingSource;
+import org.apache.flink.test.checkpointing.utils.IntType;
+import org.apache.flink.test.checkpointing.utils.ValidatingSink;
import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.test.util.MiniClusterResourceConfiguration;
-import org.apache.flink.test.util.SuccessException;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
@@ -47,9 +46,7 @@ import org.junit.runners.Parameterized;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
+import java.util.Map;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.flink.test.util.TestUtils.tryExecute;
@@ -92,7 +89,6 @@ public class WindowCheckpointingITCase extends TestLogger {
@Test
public void testTumblingProcessingTimeWindow() {
final int numElements = 3000;
- FailingSource.reset();
try {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
@@ -100,11 +96,14 @@ public class WindowCheckpointingITCase extends TestLogger {
env.setStreamTimeCharacteristic(timeCharacteristic);
env.getConfig().setAutoWatermarkInterval(10);
env.enableCheckpointing(100);
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
env.getConfig().disableSysoutLogging();
+ SinkValidatorUpdaterAndChecker updaterAndChecker =
+ new SinkValidatorUpdaterAndChecker(numElements, 1);
+
env
- .addSource(new FailingSource(numElements, numElements / 3))
+ .addSource(new FailingSource(new Generator(), numElements, timeCharacteristic))
.rebalance()
.keyBy(0)
.timeWindow(Time.of(100, MILLISECONDS))
@@ -130,11 +129,12 @@ public class WindowCheckpointingITCase extends TestLogger {
for (Tuple2<Long, IntType> value : values) {
assertEquals(value.f0.intValue(), value.f1.value);
- out.collect(new Tuple2<Long, IntType>(value.f0, new IntType(1)));
+ out.collect(new Tuple2<>(value.f0, new IntType(1)));
}
}
})
- .addSink(new ValidatingSink(numElements, 1)).setParallelism(1);
+ .addSink(new ValidatingSink<>(updaterAndChecker, updaterAndChecker, timeCharacteristic))
+ .setParallelism(1);
tryExecute(env, "Tumbling Window Test");
}
@@ -147,7 +147,6 @@ public class WindowCheckpointingITCase extends TestLogger {
@Test
public void testSlidingProcessingTimeWindow() {
final int numElements = 3000;
- FailingSource.reset();
try {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
@@ -155,11 +154,12 @@ public class WindowCheckpointingITCase extends TestLogger {
env.setStreamTimeCharacteristic(timeCharacteristic);
env.getConfig().setAutoWatermarkInterval(10);
env.enableCheckpointing(100);
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
env.getConfig().disableSysoutLogging();
-
+ SinkValidatorUpdaterAndChecker updaterAndChecker =
+ new SinkValidatorUpdaterAndChecker(numElements, 3);
env
- .addSource(new FailingSource(numElements, numElements / 3))
+ .addSource(new FailingSource(new Generator(), numElements, timeCharacteristic))
.rebalance()
.keyBy(0)
.timeWindow(Time.of(150, MILLISECONDS), Time.of(50, MILLISECONDS))
@@ -185,13 +185,14 @@ public class WindowCheckpointingITCase extends TestLogger {
for (Tuple2<Long, IntType> value : values) {
assertEquals(value.f0.intValue(), value.f1.value);
- out.collect(new Tuple2<Long, IntType>(value.f0, new IntType(1)));
+ out.collect(new Tuple2<>(value.f0, new IntType(1)));
}
}
})
- .addSink(new ValidatingSink(numElements, 3)).setParallelism(1);
+ .addSink(new ValidatingSink<>(updaterAndChecker, updaterAndChecker, timeCharacteristic))
+ .setParallelism(1);
- tryExecute(env, "Tumbling Window Test");
+ tryExecute(env, "Sliding Window Test");
}
catch (Exception e) {
e.printStackTrace();
@@ -202,7 +203,6 @@ public class WindowCheckpointingITCase extends TestLogger {
@Test
public void testAggregatingTumblingProcessingTimeWindow() {
final int numElements = 3000;
- FailingSource.reset();
try {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
@@ -210,11 +210,12 @@ public class WindowCheckpointingITCase extends TestLogger {
env.setStreamTimeCharacteristic(timeCharacteristic);
env.getConfig().setAutoWatermarkInterval(10);
env.enableCheckpointing(100);
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
env.getConfig().disableSysoutLogging();
-
+ SinkValidatorUpdaterAndChecker updaterAndChecker =
+ new SinkValidatorUpdaterAndChecker(numElements, 1);
env
- .addSource(new FailingSource(numElements, numElements / 3))
+ .addSource(new FailingSource(new Generator(), numElements, timeCharacteristic))
.map(new MapFunction<Tuple2<Long, IntType>, Tuple2<Long, IntType>>() {
@Override
public Tuple2<Long, IntType> map(Tuple2<Long, IntType> value) {
@@ -234,9 +235,10 @@ public class WindowCheckpointingITCase extends TestLogger {
return new Tuple2<>(a.f0, new IntType(1));
}
})
- .addSink(new ValidatingSink(numElements, 1)).setParallelism(1);
+ .addSink(new ValidatingSink<>(updaterAndChecker, updaterAndChecker, timeCharacteristic))
+ .setParallelism(1);
- tryExecute(env, "Tumbling Window Test");
+ tryExecute(env, "Aggregating Tumbling Window Test");
}
catch (Exception e) {
e.printStackTrace();
@@ -247,7 +249,6 @@ public class WindowCheckpointingITCase extends TestLogger {
@Test
public void testAggregatingSlidingProcessingTimeWindow() {
final int numElements = 3000;
- FailingSource.reset();
try {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
@@ -255,11 +256,12 @@ public class WindowCheckpointingITCase extends TestLogger {
env.setStreamTimeCharacteristic(timeCharacteristic);
env.getConfig().setAutoWatermarkInterval(10);
env.enableCheckpointing(100);
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
env.getConfig().disableSysoutLogging();
-
+ SinkValidatorUpdaterAndChecker updaterAndChecker =
+ new SinkValidatorUpdaterAndChecker(numElements, 3);
env
- .addSource(new FailingSource(numElements, numElements / 3))
+ .addSource(new FailingSource(new Generator(), numElements, timeCharacteristic))
.map(new MapFunction<Tuple2<Long, IntType>, Tuple2<Long, IntType>>() {
@Override
public Tuple2<Long, IntType> map(Tuple2<Long, IntType> value) {
@@ -278,9 +280,10 @@ public class WindowCheckpointingITCase extends TestLogger {
return new Tuple2<>(a.f0, new IntType(1));
}
})
- .addSink(new ValidatingSink(numElements, 3)).setParallelism(1);
+ .addSink(new ValidatingSink<>(updaterAndChecker, updaterAndChecker, timeCharacteristic))
+ .setParallelism(1);
- tryExecute(env, "Tumbling Window Test");
+ tryExecute(env, "Aggregating Sliding Window Test");
}
catch (Exception e) {
e.printStackTrace();
@@ -292,152 +295,50 @@ public class WindowCheckpointingITCase extends TestLogger {
// Utilities
// ------------------------------------------------------------------------
- private static class FailingSource extends RichSourceFunction<Tuple2<Long, IntType>>
- implements ListCheckpointed<Integer>, CheckpointListener {
- private static volatile boolean failedBefore = false;
-
- private final int numElementsToEmit;
- private final int failureAfterNumElements;
-
- private volatile int numElementsEmitted;
- private volatile int numSuccessfulCheckpoints;
- private volatile boolean running = true;
-
- private FailingSource(int numElementsToEmit, int failureAfterNumElements) {
- this.numElementsToEmit = numElementsToEmit;
- this.failureAfterNumElements = failureAfterNumElements;
- }
-
- @Override
- public void open(Configuration parameters) {
- // non-parallel source
- assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
- }
-
- @Override
- public void run(SourceContext<Tuple2<Long, IntType>> ctx) throws Exception {
- // we loop longer than we have elements, to permit delayed checkpoints
- // to still cause a failure
- while (running) {
- if (!failedBefore) {
- // delay a bit, if we have not failed before
- Thread.sleep(1);
- if (numSuccessfulCheckpoints >= 2 && numElementsEmitted >= failureAfterNumElements) {
- // cause a failure if we have not failed before and have reached
- // enough completed checkpoints and elements
- failedBefore = true;
- throw new Exception("Artificial Failure");
- }
- }
-
- if (numElementsEmitted < numElementsToEmit &&
- (failedBefore || numElementsEmitted <= failureAfterNumElements)) {
- // the function failed before, or we are in the elements before the failure
- synchronized (ctx.getCheckpointLock()) {
- int next = numElementsEmitted++;
- ctx.collect(new Tuple2<Long, IntType>((long) next, new IntType(next)));
- }
- } else {
- // if our work is done, delay a bit to prevent busy waiting
- Thread.sleep(10);
- }
- }
- }
+ static class Generator implements FailingSource.EventEmittingGenerator {
@Override
- public void cancel() {
- running = false;
- }
-
- @Override
- public void notifyCheckpointComplete(long checkpointId) {
- numSuccessfulCheckpoints++;
- }
-
- @Override
- public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
- return Collections.singletonList(this.numElementsEmitted);
- }
-
- @Override
- public void restoreState(List<Integer> state) throws Exception {
- if (state.isEmpty() || state.size() > 1) {
- throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
- }
- this.numElementsEmitted = state.get(0);
- }
-
- public static void reset() {
- failedBefore = false;
+ public void emitEvent(SourceFunction.SourceContext<Tuple2<Long, IntType>> ctx, int eventSequenceNo) {
+ ctx.collect(new Tuple2<>((long) eventSequenceNo, new IntType(eventSequenceNo)));
}
}
- private static class ValidatingSink extends RichSinkFunction<Tuple2<Long, IntType>>
- implements ListCheckpointed<HashMap<Long, Integer>> {
-
- private final HashMap<Long, Integer> counts = new HashMap<>();
+ static class SinkValidatorUpdaterAndChecker
+ implements ValidatingSink.CountUpdater<Tuple2<Long, IntType>>, ValidatingSink.ResultChecker {
private final int elementCountExpected;
private final int countPerElementExpected;
- private int aggCount;
-
- private ValidatingSink(int elementCountExpected, int countPerElementExpected) {
+ SinkValidatorUpdaterAndChecker(int elementCountExpected, int countPerElementExpected) {
this.elementCountExpected = elementCountExpected;
this.countPerElementExpected = countPerElementExpected;
}
@Override
- public void open(Configuration parameters) throws Exception {
- // this sink can only work with DOP 1
- assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
- checkSuccess();
+ public void updateCount(Tuple2<Long, IntType> value, Map<Long, Integer> windowCounts) {
+ windowCounts.merge(value.f0, value.f1.value, (a, b) -> a + b);
}
@Override
- public void invoke(Tuple2<Long, IntType> value) throws Exception {
- Integer curr = counts.get(value.f0);
- if (curr != null) {
- counts.put(value.f0, curr + value.f1.value);
- }
- else {
- counts.put(value.f0, value.f1.value);
- }
+ public boolean checkResult(Map<Long, Integer> windowCounts) {
+ int aggCount = 0;
- // check if we have seen all we expect
- aggCount += value.f1.value;
- checkSuccess();
- }
-
- private void checkSuccess() throws SuccessException {
- if (aggCount >= elementCountExpected * countPerElementExpected) {
- // we are done. validate
- assertEquals(elementCountExpected, counts.size());
-
- for (Integer i : counts.values()) {
- assertEquals(countPerElementExpected, i.intValue());
- }
-
- // exit
- throw new SuccessException();
+ for (Integer i : windowCounts.values()) {
+ aggCount += i;
}
- }
- @Override
- public List<HashMap<Long, Integer>> snapshotState(long checkpointId, long timestamp) throws Exception {
- return Collections.singletonList(this.counts);
- }
-
- @Override
- public void restoreState(List<HashMap<Long, Integer>> state) throws Exception {
- if (state.isEmpty() || state.size() > 1) {
- throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+ if (aggCount < elementCountExpected * countPerElementExpected
+ || elementCountExpected != windowCounts.size()) {
+ return false;
}
- this.counts.putAll(state.get(0));
- for (Integer i : state.get(0).values()) {
- this.aggCount += i;
+ for (int i : windowCounts.values()) {
+ if (countPerElementExpected != i) {
+ return false;
+ }
}
+
+ return true;
}
}
@@ -452,22 +353,4 @@ public class WindowCheckpointingITCase extends TestLogger {
new TimeCharacteristic[]{TimeCharacteristic.IngestionTime}
);
}
-
- // ------------------------------------------------------------------------
- // Utilities
- // ------------------------------------------------------------------------
-
- /**
- * POJO with int value.
- */
- public static class IntType {
-
- public int value;
-
- public IntType() {}
-
- public IntType(int value) {
- this.value = value;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d309e61e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/FailingSource.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/FailingSource.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/FailingSource.java
new file mode 100644
index 0000000..822d73b
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/FailingSource.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Source for window checkpointing IT cases that can introduce artificial failures.
+ */
+public class FailingSource extends RichSourceFunction<Tuple2<Long, IntType>>
+ implements ListCheckpointed<Integer>, CheckpointListener {
+
+ /**
+ * Function to generate and emit the test events (and watermarks if required).
+ */
+ @FunctionalInterface
+ public interface EventEmittingGenerator extends Serializable {
+ void emitEvent(SourceContext<Tuple2<Long, IntType>> ctx, int eventSequenceNo);
+ }
+
+ private static final long INITIAL = Long.MIN_VALUE;
+ private static final long STATEFUL_CHECKPOINT_COMPLETED = Long.MIN_VALUE;
+
+ @Nonnull
+ private final EventEmittingGenerator eventEmittingGenerator;
+ private final int expectedEmitCalls;
+ private final int failureAfterNumElements;
+ private final boolean usingProcessingTime;
+ private final AtomicLong checkpointStatus;
+
+ private int emitCallCount;
+ private volatile boolean running;
+
+ public FailingSource(
+ @Nonnull EventEmittingGenerator eventEmittingGenerator,
+ @Nonnegative int numberOfGeneratorInvocations) {
+ this(eventEmittingGenerator, numberOfGeneratorInvocations, TimeCharacteristic.EventTime);
+ }
+
+ public FailingSource(
+ @Nonnull EventEmittingGenerator eventEmittingGenerator,
+ @Nonnegative int numberOfGeneratorInvocations,
+ @Nonnull TimeCharacteristic timeCharacteristic) {
+ this.eventEmittingGenerator = eventEmittingGenerator;
+ this.running = true;
+ this.emitCallCount = 0;
+ this.expectedEmitCalls = numberOfGeneratorInvocations;
+ this.failureAfterNumElements = numberOfGeneratorInvocations / 2;
+ this.checkpointStatus = new AtomicLong(INITIAL);
+ this.usingProcessingTime = timeCharacteristic == TimeCharacteristic.ProcessingTime;
+ }
+
+ @Override
+ public void open(Configuration parameters) {
+ // non-parallel source
+ assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
+ }
+
+ @Override
+ public void run(SourceContext<Tuple2<Long, IntType>> ctx) throws Exception {
+
+ final RuntimeContext runtimeContext = getRuntimeContext();
+ // detect if this task is "the chosen one" and should fail (via subtaskidx), if it did not fail before (via attempt)
+ final boolean failThisTask =
+ runtimeContext.getAttemptNumber() == 0 && runtimeContext.getIndexOfThisSubtask() == 0;
+
+ // we loop longer than we have elements, to permit delayed checkpoints
+ // to still cause a failure
+ while (running) {
+
+ // the function failed before, or we are in the elements before the failure
+ synchronized (ctx.getCheckpointLock()) {
+ eventEmittingGenerator.emitEvent(ctx, emitCallCount++);
+ running &= (emitCallCount < expectedEmitCalls);
+ }
+
+ if (emitCallCount < failureAfterNumElements) {
+ Thread.sleep(1);
+ } else if (failThisTask && emitCallCount == failureAfterNumElements) {
+ // wait for a pending checkpoint that fulfills our requirements if needed
+ while (checkpointStatus.get() != STATEFUL_CHECKPOINT_COMPLETED) {
+ Thread.sleep(1);
+ }
+ throw new Exception("Artificial Failure");
+ }
+ }
+
+ if (usingProcessingTime) {
+ while (running) {
+ Thread.sleep(10);
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) {
+ // This will unblock the task for failing, if this is the checkpoint we are waiting for
+ checkpointStatus.compareAndSet(checkpointId, STATEFUL_CHECKPOINT_COMPLETED);
+ }
+
+ @Override
+ public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
+ // We accept a checkpoint as basis if it should have a "decent amount" of state
+ if (emitCallCount > failureAfterNumElements / 2) {
+ // This means we are waiting for notification of this checkpoint to completed now.
+ checkpointStatus.compareAndSet(INITIAL, checkpointId);
+ }
+ return Collections.singletonList(this.emitCallCount);
+ }
+
+ @Override
+ public void restoreState(List<Integer> state) throws Exception {
+ if (state.isEmpty() || state.size() > 1) {
+ throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+ }
+ this.emitCallCount = state.get(0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/d309e61e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/IntType.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/IntType.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/IntType.java
new file mode 100644
index 0000000..3bc4ea0
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/IntType.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+/**
+ * Test type that wraps an int.
+ */
+public class IntType {
+
+ public int value;
+
+ public IntType(int value) {
+ this.value = value;
+ }
+
+ @Override
+ public String toString() {
+ return "IntType{" +
+ "value=" + value +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/d309e61e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/ValidatingSink.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/ValidatingSink.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/ValidatingSink.java
new file mode 100644
index 0000000..b352738
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/ValidatingSink.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.test.util.SuccessException;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Generalized sink for validation of window checkpointing IT cases.
+ */
+public class ValidatingSink<T> extends RichSinkFunction<T>
+ implements ListCheckpointed<HashMap<Long, Integer>> {
+
+ /**
+ * Function to check if the window counts are as expected.
+ */
+ @FunctionalInterface
+ public interface ResultChecker extends Serializable {
+ boolean checkResult(Map<Long, Integer> windowCounts);
+ }
+
+ /**
+ * Function that updates the window counts from an update event.
+ *
+ * @param <T> type of the update event.
+ */
+ public interface CountUpdater<T> extends Serializable {
+ void updateCount(T update, Map<Long, Integer> windowCounts);
+ }
+
+ @Nonnull
+ private final ResultChecker resultChecker;
+
+ @Nonnull
+ private final CountUpdater<T> countUpdater;
+
+ @Nonnull
+ private final HashMap<Long, Integer> windowCounts;
+
+ private final boolean usingProcessingTime;
+
+ public ValidatingSink(
+ @Nonnull CountUpdater<T> countUpdater,
+ @Nonnull ResultChecker resultChecker) {
+ this(countUpdater, resultChecker, TimeCharacteristic.EventTime);
+ }
+
+ public ValidatingSink(
+ @Nonnull CountUpdater<T> countUpdater,
+ @Nonnull ResultChecker resultChecker,
+ @Nonnull TimeCharacteristic timeCharacteristic) {
+
+ this.resultChecker = resultChecker;
+ this.countUpdater = countUpdater;
+ this.usingProcessingTime = TimeCharacteristic.ProcessingTime == timeCharacteristic;
+ this.windowCounts = new HashMap<>();
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ // this sink can only work with DOP 1
+ assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
+ if (usingProcessingTime && resultChecker.checkResult(windowCounts)) {
+ throw new SuccessException();
+ }
+ }
+
+ @Override
+ public void close() {
+ if (resultChecker.checkResult(windowCounts)) {
+ if (usingProcessingTime) {
+ throw new SuccessException();
+ }
+ } else {
+ throw new AssertionError("Test failed check.");
+ }
+ }
+
+ @Override
+ public void invoke(T value, Context context) throws Exception {
+ countUpdater.updateCount(value, windowCounts);
+ if (usingProcessingTime && resultChecker.checkResult(windowCounts)) {
+ throw new SuccessException();
+ }
+ }
+
+ @Override
+ public List<HashMap<Long, Integer>> snapshotState(long checkpointId, long timestamp) throws Exception {
+ return Collections.singletonList(this.windowCounts);
+ }
+
+ @Override
+ public void restoreState(List<HashMap<Long, Integer>> state) throws Exception {
+ if (state.isEmpty() || state.size() > 1) {
+ throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+ }
+ windowCounts.putAll(state.get(0));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/d309e61e/flink-tests/src/test/java/org/apache/flink/test/util/SuccessException.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/util/SuccessException.java b/flink-tests/src/test/java/org/apache/flink/test/util/SuccessException.java
index 22ac02b..d8e2a8f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/util/SuccessException.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/util/SuccessException.java
@@ -21,6 +21,6 @@ package org.apache.flink.test.util;
/**
* Exception that is thrown to terminate a program and indicate success.
*/
-public class SuccessException extends Exception {
+public class SuccessException extends RuntimeException {
private static final long serialVersionUID = -7011865671593955887L;
}