You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/07/13 12:39:50 UTC
[6/6] flink git commit: [FLINK-9807][tests] Parameterize
EventTimeWindowCheckpointITCase & LocalRecoveryITCase
[FLINK-9807][tests] Parameterize EventTimeWindowCheckpointITCase & LocalRecoveryITCase
This closes #6305.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fc49801d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fc49801d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fc49801d
Branch: refs/heads/master
Commit: fc49801d4723413d3a09ecb85d60d39078056c68
Parents: 37abf46
Author: klion26 <qc...@gmail.com>
Authored: Sun Jul 8 11:52:41 2018 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Jul 13 14:39:31 2018 +0200
----------------------------------------------------------------------
...tractEventTimeWindowCheckpointingITCase.java | 889 ------------------
.../AbstractLocalRecoveryITCase.java | 99 --
...ckendEventTimeWindowCheckpointingITCase.java | 30 -
...ckendEventTimeWindowCheckpointingITCase.java | 29 -
.../EventTimeAllWindowCheckpointingITCase.java | 2 +-
.../EventTimeWindowCheckpointingITCase.java | 931 +++++++++++++++++++
...ckendEventTimeWindowCheckpointingITCase.java | 29 -
...ckendEventTimeWindowCheckpointingITCase.java | 50 -
...ckendEventTimeWindowCheckpointingITCase.java | 50 -
.../checkpointing/LocalRecoveryHeapITCase.java | 30 -
.../test/checkpointing/LocalRecoveryITCase.java | 110 +++
.../LocalRecoveryRocksDBFullITCase.java | 30 -
.../LocalRecoveryRocksDBIncrementalITCase.java | 30 -
...ckendEventTimeWindowCheckpointingITCase.java | 30 -
...ckendEventTimeWindowCheckpointingITCase.java | 50 -
15 files changed, 1042 insertions(+), 1347 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/fc49801d/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
deleted file mode 100644
index df74450..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
+++ /dev/null
@@ -1,889 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.checkpointing;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.CheckpointListener;
-import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
-import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.test.util.MiniClusterResource;
-import org.apache.flink.test.util.MiniClusterResourceConfiguration;
-import org.apache.flink.test.util.SuccessException;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.TestLogger;
-
-import org.apache.curator.test.TestingServer;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.rules.TestName;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_INCREMENTAL_ZK;
-import static org.apache.flink.test.util.TestUtils.tryExecute;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * This verifies that checkpointing works correctly with event time windows. This is more
- * strict than {@link WindowCheckpointingITCase} because for event-time the contents
- * of the emitted windows are deterministic.
- *
- * <p>Split into multiple test classes in order to decrease the runtime per backend
- * and not run into CI infrastructure limits like no std output being emitted for
- * I/O heavy variants.
- */
-@SuppressWarnings("serial")
-public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLogger {
-
- private static final int MAX_MEM_STATE_SIZE = 20 * 1024 * 1024;
- private static final int PARALLELISM = 4;
-
- private TestingServer zkServer;
-
- public MiniClusterResource miniClusterResource;
-
- @ClassRule
- public static TemporaryFolder tempFolder = new TemporaryFolder();
-
- @Rule
- public TestName name = new TestName();
-
- private AbstractStateBackend stateBackend;
-
- enum StateBackendEnum {
- MEM, FILE, ROCKSDB_FULLY_ASYNC, ROCKSDB_INCREMENTAL, ROCKSDB_INCREMENTAL_ZK, MEM_ASYNC, FILE_ASYNC
- }
-
- protected abstract StateBackendEnum getStateBackend();
-
- protected final MiniClusterResource getMiniClusterResource() {
- return new MiniClusterResource(
- new MiniClusterResourceConfiguration.Builder()
- .setConfiguration(getConfigurationSafe())
- .setNumberTaskManagers(2)
- .setNumberSlotsPerTaskManager(PARALLELISM / 2)
- .build());
- }
-
- private Configuration getConfigurationSafe() {
- try {
- return getConfiguration();
- } catch (Exception e) {
- throw new AssertionError("Could not initialize test.", e);
- }
- }
-
- private Configuration getConfiguration() throws Exception {
-
- // print a message when starting a test method to avoid Travis' <tt>"Maven produced no
- // output for xxx seconds."</tt> messages
- System.out.println(
- "Starting " + getClass().getCanonicalName() + "#" + name.getMethodName() + ".");
-
- // Testing HA Scenario / ZKCompletedCheckpointStore with incremental checkpoints
- StateBackendEnum stateBackendEnum = getStateBackend();
- if (ROCKSDB_INCREMENTAL_ZK.equals(stateBackendEnum)) {
- zkServer = new TestingServer();
- zkServer.start();
- }
-
- Configuration config = createClusterConfig();
-
- switch (stateBackendEnum) {
- case MEM:
- this.stateBackend = new MemoryStateBackend(MAX_MEM_STATE_SIZE, false);
- break;
- case FILE: {
- String backups = tempFolder.newFolder().getAbsolutePath();
- this.stateBackend = new FsStateBackend("file://" + backups, false);
- break;
- }
- case MEM_ASYNC:
- this.stateBackend = new MemoryStateBackend(MAX_MEM_STATE_SIZE, true);
- break;
- case FILE_ASYNC: {
- String backups = tempFolder.newFolder().getAbsolutePath();
- this.stateBackend = new FsStateBackend("file://" + backups, true);
- break;
- }
- case ROCKSDB_FULLY_ASYNC: {
- String rocksDb = tempFolder.newFolder().getAbsolutePath();
- String backups = tempFolder.newFolder().getAbsolutePath();
- RocksDBStateBackend rdb = new RocksDBStateBackend(new FsStateBackend("file://" + backups));
- rdb.setDbStoragePath(rocksDb);
- this.stateBackend = rdb;
- break;
- }
- case ROCKSDB_INCREMENTAL:
- case ROCKSDB_INCREMENTAL_ZK: {
- String rocksDb = tempFolder.newFolder().getAbsolutePath();
- String backups = tempFolder.newFolder().getAbsolutePath();
- // we use the fs backend with small threshold here to test the behaviour with file
- // references, not self contained byte handles
- RocksDBStateBackend rdb =
- new RocksDBStateBackend(
- new FsStateBackend(
- new Path("file://" + backups).toUri(), 16),
- true);
- rdb.setDbStoragePath(rocksDb);
- this.stateBackend = rdb;
- break;
- }
- default:
- throw new IllegalStateException("No backend selected.");
- }
- return config;
- }
-
- protected Configuration createClusterConfig() throws IOException {
- TemporaryFolder temporaryFolder = new TemporaryFolder();
- temporaryFolder.create();
- final File haDir = temporaryFolder.newFolder();
-
- Configuration config = new Configuration();
- config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "48m");
- // the default network buffers size (10% of heap max =~ 150MB) seems to much for this test case
- config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(80L << 20)); // 80 MB
- config.setString(AkkaOptions.FRAMESIZE, String.valueOf(MAX_MEM_STATE_SIZE) + "b");
-
- if (zkServer != null) {
- config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
- config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString());
- config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, haDir.toURI().toString());
- }
- return config;
- }
-
- @Before
- public void setupTestCluster() throws Exception {
- miniClusterResource = getMiniClusterResource();
- miniClusterResource.before();
- }
-
- @After
- public void stopTestCluster() throws IOException {
- if (miniClusterResource != null) {
- miniClusterResource.after();
- miniClusterResource = null;
- }
-
- if (zkServer != null) {
- zkServer.stop();
- zkServer = null;
- }
-
- //Prints a message when finishing a test method to avoid Travis' <tt>"Maven produced no output
- // for xxx seconds."</tt> messages.
- System.out.println(
- "Finished " + getClass().getCanonicalName() + "#" + name.getMethodName() + ".");
- }
-
- // ------------------------------------------------------------------------
-
- @Test
- public void testTumblingTimeWindow() {
- final int numElementsPerKey = numElementsPerKey();
- final int windowSize = windowSize();
- final int numKeys = numKeys();
- FailingSource.reset();
-
- try {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(PARALLELISM);
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
- env.enableCheckpointing(100);
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
- env.getConfig().disableSysoutLogging();
- env.setStateBackend(this.stateBackend);
- env.getConfig().setUseSnapshotCompression(true);
-
- env
- .addSource(new FailingSource(numKeys, numElementsPerKey, numElementsPerKey / 3))
- .rebalance()
- .keyBy(0)
- .timeWindow(Time.of(windowSize, MILLISECONDS))
- .apply(new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() {
-
- private boolean open = false;
-
- @Override
- public void open(Configuration parameters) {
- assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks());
- open = true;
- }
-
- @Override
- public void apply(
- Tuple tuple,
- TimeWindow window,
- Iterable<Tuple2<Long, IntType>> values,
- Collector<Tuple4<Long, Long, Long, IntType>> out) {
-
- // validate that the function has been opened properly
- assertTrue(open);
-
- int sum = 0;
- long key = -1;
-
- for (Tuple2<Long, IntType> value : values) {
- sum += value.f1.value;
- key = value.f0;
- }
- out.collect(new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum)));
- }
- })
- .addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSize)).setParallelism(1);
-
- tryExecute(env, "Tumbling Window Test");
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testTumblingTimeWindowWithKVStateMinMaxParallelism() {
- doTestTumblingTimeWindowWithKVState(PARALLELISM);
- }
-
- @Test
- public void testTumblingTimeWindowWithKVStateMaxMaxParallelism() {
- doTestTumblingTimeWindowWithKVState(1 << 15);
- }
-
- public void doTestTumblingTimeWindowWithKVState(int maxParallelism) {
- final int numElementsPerKey = numElementsPerKey();
- final int windowSize = windowSize();
- final int numKeys = numKeys();
- FailingSource.reset();
-
- try {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(PARALLELISM);
- env.setMaxParallelism(maxParallelism);
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
- env.enableCheckpointing(100);
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
- env.getConfig().disableSysoutLogging();
- env.setStateBackend(this.stateBackend);
- env.getConfig().setUseSnapshotCompression(true);
-
- env
- .addSource(new FailingSource(numKeys, numElementsPerKey, numElementsPerKey / 3))
- .rebalance()
- .keyBy(0)
- .timeWindow(Time.of(windowSize, MILLISECONDS))
- .apply(new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() {
-
- private boolean open = false;
-
- private ValueState<Integer> count;
-
- @Override
- public void open(Configuration parameters) {
- assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks());
- open = true;
- count = getRuntimeContext().getState(
- new ValueStateDescriptor<>("count", Integer.class, 0));
- }
-
- @Override
- public void apply(
- Tuple tuple,
- TimeWindow window,
- Iterable<Tuple2<Long, IntType>> values,
- Collector<Tuple4<Long, Long, Long, IntType>> out) throws Exception {
-
- // the window count state starts with the key, so that we get
- // different count results for each key
- if (count.value() == 0) {
- count.update(tuple.<Long>getField(0).intValue());
- }
-
- // validate that the function has been opened properly
- assertTrue(open);
-
- count.update(count.value() + 1);
- out.collect(new Tuple4<>(tuple.<Long>getField(0), window.getStart(), window.getEnd(), new IntType(count.value())));
- }
- })
- .addSink(new CountValidatingSink(numKeys, numElementsPerKey / windowSize)).setParallelism(1);
-
- tryExecute(env, "Tumbling Window Test");
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testSlidingTimeWindow() {
- final int numElementsPerKey = numElementsPerKey();
- final int windowSize = windowSize();
- final int windowSlide = windowSlide();
- final int numKeys = numKeys();
- FailingSource.reset();
-
- try {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setMaxParallelism(2 * PARALLELISM);
- env.setParallelism(PARALLELISM);
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
- env.enableCheckpointing(100);
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
- env.getConfig().disableSysoutLogging();
- env.setStateBackend(this.stateBackend);
- env.getConfig().setUseSnapshotCompression(true);
-
- env
- .addSource(new FailingSource(numKeys, numElementsPerKey, numElementsPerKey / 3))
- .rebalance()
- .keyBy(0)
- .timeWindow(Time.of(windowSize, MILLISECONDS), Time.of(windowSlide, MILLISECONDS))
- .apply(new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() {
-
- private boolean open = false;
-
- @Override
- public void open(Configuration parameters) {
- assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks());
- open = true;
- }
-
- @Override
- public void apply(
- Tuple tuple,
- TimeWindow window,
- Iterable<Tuple2<Long, IntType>> values,
- Collector<Tuple4<Long, Long, Long, IntType>> out) {
-
- // validate that the function has been opened properly
- assertTrue(open);
-
- int sum = 0;
- long key = -1;
-
- for (Tuple2<Long, IntType> value : values) {
- sum += value.f1.value;
- key = value.f0;
- }
- out.collect(new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum)));
- }
- })
- .addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSlide)).setParallelism(1);
-
- tryExecute(env, "Tumbling Window Test");
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testPreAggregatedTumblingTimeWindow() {
- final int numElementsPerKey = numElementsPerKey();
- final int windowSize = windowSize();
- final int numKeys = numKeys();
- FailingSource.reset();
-
- try {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(PARALLELISM);
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
- env.enableCheckpointing(100);
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
- env.getConfig().disableSysoutLogging();
- env.setStateBackend(this.stateBackend);
- env.getConfig().setUseSnapshotCompression(true);
-
- env
- .addSource(new FailingSource(numKeys, numElementsPerKey, numElementsPerKey / 3))
- .rebalance()
- .keyBy(0)
- .timeWindow(Time.of(windowSize, MILLISECONDS))
- .reduce(
- new ReduceFunction<Tuple2<Long, IntType>>() {
-
- @Override
- public Tuple2<Long, IntType> reduce(
- Tuple2<Long, IntType> a,
- Tuple2<Long, IntType> b) {
- return new Tuple2<>(a.f0, new IntType(a.f1.value + b.f1.value));
- }
- },
- new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() {
-
- private boolean open = false;
-
- @Override
- public void open(Configuration parameters) {
- assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks());
- open = true;
- }
-
- @Override
- public void apply(
- Tuple tuple,
- TimeWindow window,
- Iterable<Tuple2<Long, IntType>> input,
- Collector<Tuple4<Long, Long, Long, IntType>> out) {
-
- // validate that the function has been opened properly
- assertTrue(open);
-
- for (Tuple2<Long, IntType> in: input) {
- out.collect(new Tuple4<>(in.f0,
- window.getStart(),
- window.getEnd(),
- in.f1));
- }
- }
- })
- .addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSize)).setParallelism(1);
-
- tryExecute(env, "Tumbling Window Test");
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testPreAggregatedSlidingTimeWindow() {
- final int numElementsPerKey = numElementsPerKey();
- final int windowSize = windowSize();
- final int windowSlide = windowSlide();
- final int numKeys = numKeys();
- FailingSource.reset();
-
- try {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(PARALLELISM);
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
- env.enableCheckpointing(100);
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
- env.getConfig().disableSysoutLogging();
- env.setStateBackend(this.stateBackend);
- env.getConfig().setUseSnapshotCompression(true);
-
- env
- .addSource(new FailingSource(numKeys, numElementsPerKey, numElementsPerKey / 3))
- .rebalance()
- .keyBy(0)
- .timeWindow(Time.of(windowSize, MILLISECONDS), Time.of(windowSlide, MILLISECONDS))
- .reduce(
- new ReduceFunction<Tuple2<Long, IntType>>() {
-
- @Override
- public Tuple2<Long, IntType> reduce(
- Tuple2<Long, IntType> a,
- Tuple2<Long, IntType> b) {
-
- // validate that the function has been opened properly
- return new Tuple2<>(a.f0, new IntType(a.f1.value + b.f1.value));
- }
- },
- new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() {
-
- private boolean open = false;
-
- @Override
- public void open(Configuration parameters) {
- assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks());
- open = true;
- }
-
- @Override
- public void apply(
- Tuple tuple,
- TimeWindow window,
- Iterable<Tuple2<Long, IntType>> input,
- Collector<Tuple4<Long, Long, Long, IntType>> out) {
-
- // validate that the function has been opened properly
- assertTrue(open);
-
- for (Tuple2<Long, IntType> in: input) {
- out.collect(new Tuple4<>(in.f0,
- window.getStart(),
- window.getEnd(),
- in.f1));
- }
- }
- })
- .addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSlide)).setParallelism(1);
-
- tryExecute(env, "Tumbling Window Test");
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- // ------------------------------------------------------------------------
- // Utilities
- // ------------------------------------------------------------------------
-
- private static class FailingSource extends RichSourceFunction<Tuple2<Long, IntType>>
- implements ListCheckpointed<Integer>, CheckpointListener {
- private static volatile boolean failedBefore = false;
-
- private final int numKeys;
- private final int numElementsToEmit;
- private final int failureAfterNumElements;
-
- private volatile int numElementsEmitted;
- private volatile int numSuccessfulCheckpoints;
- private volatile boolean running = true;
-
- private FailingSource(int numKeys, int numElementsToEmitPerKey, int failureAfterNumElements) {
- this.numKeys = numKeys;
- this.numElementsToEmit = numElementsToEmitPerKey;
- this.failureAfterNumElements = failureAfterNumElements;
- }
-
- @Override
- public void open(Configuration parameters) {
- // non-parallel source
- assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
- }
-
- @Override
- public void run(SourceContext<Tuple2<Long, IntType>> ctx) throws Exception {
- // we loop longer than we have elements, to permit delayed checkpoints
- // to still cause a failure
- while (running) {
-
- if (!failedBefore) {
- // delay a bit, if we have not failed before
- Thread.sleep(1);
- if (numSuccessfulCheckpoints >= 2 && numElementsEmitted >= failureAfterNumElements) {
- // cause a failure if we have not failed before and have reached
- // enough completed checkpoints and elements
- failedBefore = true;
- throw new Exception("Artificial Failure");
- }
- }
-
- if (numElementsEmitted < numElementsToEmit &&
- (failedBefore || numElementsEmitted <= failureAfterNumElements)) {
- // the function failed before, or we are in the elements before the failure
- synchronized (ctx.getCheckpointLock()) {
- int next = numElementsEmitted++;
- for (long i = 0; i < numKeys; i++) {
- ctx.collectWithTimestamp(new Tuple2<Long, IntType>(i, new IntType(next)), next);
- }
- ctx.emitWatermark(new Watermark(next));
- }
- }
- else {
-
- // if our work is done, delay a bit to prevent busy waiting
- Thread.sleep(1);
- }
- }
- }
-
- @Override
- public void cancel() {
- running = false;
- }
-
- @Override
- public void notifyCheckpointComplete(long checkpointId) {
- numSuccessfulCheckpoints++;
- }
-
- @Override
- public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
- return Collections.singletonList(this.numElementsEmitted);
- }
-
- @Override
- public void restoreState(List<Integer> state) throws Exception {
- if (state.isEmpty() || state.size() > 1) {
- throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
- }
- this.numElementsEmitted = state.get(0);
- }
-
- public static void reset() {
- failedBefore = false;
- }
- }
-
- private static class ValidatingSink extends RichSinkFunction<Tuple4<Long, Long, Long, IntType>>
- implements ListCheckpointed<HashMap<Long, Integer>> {
-
- private final HashMap<Long, Integer> windowCounts = new HashMap<>();
-
- private final int numKeys;
- private final int numWindowsExpected;
-
- private ValidatingSink(int numKeys, int numWindowsExpected) {
- this.numKeys = numKeys;
- this.numWindowsExpected = numWindowsExpected;
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- // this sink can only work with DOP 1
- assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
-
- // it can happen that a checkpoint happens when the complete success state is
- // already set. In that case we restart with the final state and would never
- // finish because no more elements arrive.
- if (windowCounts.size() == numKeys) {
- boolean seenAll = true;
- for (Integer windowCount: windowCounts.values()) {
- if (windowCount != numWindowsExpected) {
- seenAll = false;
- break;
- }
- }
- if (seenAll) {
- throw new SuccessException();
- }
- }
- }
-
- @Override
- public void close() throws Exception {
- boolean seenAll = true;
- if (windowCounts.size() == numKeys) {
- for (Integer windowCount: windowCounts.values()) {
- if (windowCount < numWindowsExpected) {
- seenAll = false;
- break;
- }
- }
- }
- assertTrue("The sink must see all expected windows.", seenAll);
- }
-
- @Override
- public void invoke(Tuple4<Long, Long, Long, IntType> value) throws Exception {
-
- // verify the contents of that window, Tuple4.f1 and .f2 are the window start/end
- // the sum should be "sum (start .. end-1)"
-
- int expectedSum = 0;
- for (long i = value.f1; i < value.f2; i++) {
- // only sum up positive vals, to filter out the negative start of the
- // first sliding windows
- if (i > 0) {
- expectedSum += i;
- }
- }
-
- assertEquals("Window start: " + value.f1 + " end: " + value.f2, expectedSum, value.f3.value);
-
- Integer curr = windowCounts.get(value.f0);
- if (curr != null) {
- windowCounts.put(value.f0, curr + 1);
- }
- else {
- windowCounts.put(value.f0, 1);
- }
-
- if (windowCounts.size() == numKeys) {
- boolean seenAll = true;
- for (Integer windowCount: windowCounts.values()) {
- if (windowCount < numWindowsExpected) {
- seenAll = false;
- break;
- } else if (windowCount > numWindowsExpected) {
- fail("Window count to high: " + windowCount);
- }
- }
-
- if (seenAll) {
- // exit
- throw new SuccessException();
- }
-
- }
- }
-
- @Override
- public List<HashMap<Long, Integer>> snapshotState(long checkpointId, long timestamp) throws Exception {
- return Collections.singletonList(this.windowCounts);
- }
-
- @Override
- public void restoreState(List<HashMap<Long, Integer>> state) throws Exception {
- if (state.isEmpty() || state.size() > 1) {
- throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
- }
- windowCounts.putAll(state.get(0));
- }
- }
-
- // Sink for validating the stateful window counts
- private static class CountValidatingSink extends RichSinkFunction<Tuple4<Long, Long, Long, IntType>>
- implements ListCheckpointed<HashMap<Long, Integer>> {
-
- private final HashMap<Long, Integer> windowCounts = new HashMap<>();
-
- private final int numKeys;
- private final int numWindowsExpected;
-
- private CountValidatingSink(int numKeys, int numWindowsExpected) {
- this.numKeys = numKeys;
- this.numWindowsExpected = numWindowsExpected;
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- // this sink can only work with DOP 1
- assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
- }
-
- @Override
- public void close() throws Exception {
- boolean seenAll = true;
- if (windowCounts.size() == numKeys) {
- for (Integer windowCount: windowCounts.values()) {
- if (windowCount < numWindowsExpected) {
- seenAll = false;
- break;
- }
- }
- }
- assertTrue("The source must see all expected windows.", seenAll);
- }
-
- @Override
- public void invoke(Tuple4<Long, Long, Long, IntType> value) throws Exception {
-
- Integer curr = windowCounts.get(value.f0);
- if (curr != null) {
- windowCounts.put(value.f0, curr + 1);
- }
- else {
- windowCounts.put(value.f0, 1);
- }
-
- // verify the contents of that window, the contents should be:
- // (key + num windows so far)
-
- assertEquals("Window counts don't match for key " + value.f0 + ".", value.f0.intValue() + windowCounts.get(value.f0), value.f3.value);
-
- boolean seenAll = true;
- if (windowCounts.size() == numKeys) {
- for (Integer windowCount: windowCounts.values()) {
- if (windowCount < numWindowsExpected) {
- seenAll = false;
- break;
- } else if (windowCount > numWindowsExpected) {
- fail("Window count to high: " + windowCount);
- }
- }
-
- if (seenAll) {
- // exit
- throw new SuccessException();
- }
-
- }
- }
-
- @Override
- public List<HashMap<Long, Integer>> snapshotState(long checkpointId, long timestamp) throws Exception {
- return Collections.singletonList(this.windowCounts);
- }
-
- @Override
- public void restoreState(List<HashMap<Long, Integer>> state) throws Exception {
- if (state.isEmpty() || state.size() > 1) {
- throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
- }
- this.windowCounts.putAll(state.get(0));
- }
- }
-
- // ------------------------------------------------------------------------
- // Utilities
- // ------------------------------------------------------------------------
-
- private static class IntType {
-
- public int value;
-
- public IntType() {}
-
- public IntType(int value) {
- this.value = value;
- }
- }
-
- protected int numElementsPerKey() {
- return 300;
- }
-
- protected int windowSize() {
- return 100;
- }
-
- protected int windowSlide() {
- return 100;
- }
-
- protected int numKeys() {
- return 20;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/fc49801d/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractLocalRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractLocalRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractLocalRecoveryITCase.java
deleted file mode 100644
index 4e454d7..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractLocalRecoveryITCase.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.checkpointing;
-
-import org.apache.flink.configuration.CheckpointingOptions;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-
-import java.io.IOException;
-
-import static org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum;
-
-/**
- * This test delegates to instances of {@link AbstractEventTimeWindowCheckpointingITCase} that have been reconfigured
- * to use local recovery.
- *
- * <p>TODO: This class must be refactored to properly extend {@link AbstractEventTimeWindowCheckpointingITCase}.
- */
-public abstract class AbstractLocalRecoveryITCase extends TestLogger {
-
- private final StateBackendEnum backendEnum;
- private final boolean localRecoveryEnabled;
-
- @Rule
- public TestName testName = new TestName();
-
- AbstractLocalRecoveryITCase(StateBackendEnum backendEnum, boolean localRecoveryEnabled) {
- this.backendEnum = backendEnum;
- this.localRecoveryEnabled = localRecoveryEnabled;
- }
-
- @Test
- public final void executeTest() throws Exception {
- AbstractEventTimeWindowCheckpointingITCase.tempFolder.create();
- AbstractEventTimeWindowCheckpointingITCase windowChkITCase =
- new AbstractEventTimeWindowCheckpointingITCase() {
- @Override
- protected StateBackendEnum getStateBackend() {
- return backendEnum;
- }
-
- @Override
- protected Configuration createClusterConfig() throws IOException {
- Configuration config = super.createClusterConfig();
-
- config.setBoolean(
- CheckpointingOptions.LOCAL_RECOVERY,
- localRecoveryEnabled);
-
- return config;
- }
- };
-
- executeTest(windowChkITCase);
- }
-
- private void executeTest(AbstractEventTimeWindowCheckpointingITCase delegate) throws Exception {
- delegate.name = testName;
- try {
- delegate.setupTestCluster();
- try {
- delegate.testTumblingTimeWindow();
- delegate.stopTestCluster();
- } catch (Exception e) {
- delegate.stopTestCluster();
- }
-
- delegate.setupTestCluster();
- try {
- delegate.testSlidingTimeWindow();
- delegate.stopTestCluster();
- } catch (Exception e) {
- delegate.stopTestCluster();
- }
- } finally {
- delegate.tempFolder.delete();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/fc49801d/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java
deleted file mode 100644
index c4b06d4..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.checkpointing;
-
-/**
- * Integration tests for asynchronous file backend.
- */
-public class AsyncFileBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase {
-
- @Override
- protected StateBackendEnum getStateBackend() {
- return StateBackendEnum.FILE_ASYNC;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/fc49801d/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java
deleted file mode 100644
index 2cc5b01..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.checkpointing;
-
-/**
- * Integration tests for asynchronous memory backend.
- */
-public class AsyncMemBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase {
- @Override
- protected StateBackendEnum getStateBackend() {
- return StateBackendEnum.MEM_ASYNC;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/fc49801d/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
index d05bafb..9e14b26 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
@@ -58,7 +58,7 @@ import static org.junit.Assert.fail;
/**
* This verifies that checkpointing works correctly with event time windows.
*
- * <p>This is a version of {@link AbstractEventTimeWindowCheckpointingITCase} for All-Windows.
+ * <p>This is a version of {@link EventTimeWindowCheckpointingITCase} for All-Windows.
*/
@SuppressWarnings("serial")
public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
http://git-wip-us.apache.org/repos/asf/flink/blob/fc49801d/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
new file mode 100644
index 0000000..c3d93d7
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
@@ -0,0 +1,931 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.SuccessException;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.FILE;
+import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.FILE_ASYNC;
+import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.MEM;
+import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.MEM_ASYNC;
+import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_FULLY_ASYNC;
+import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_INCREMENTAL;
+import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_INCREMENTAL_ZK;
+import static org.apache.flink.test.util.TestUtils.tryExecute;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * This verifies that checkpointing works correctly with event time windows. This is more
+ * strict than {@link WindowCheckpointingITCase} because for event-time the contents
+ * of the emitted windows are deterministic.
+ *
+ * <p>Split into multiple test classes in order to decrease the runtime per backend
+ * and not run into CI infrastructure limits like no std output being emitted for
+ * I/O heavy variants.
+ */
+@SuppressWarnings("serial")
+@RunWith(Parameterized.class)
+public class EventTimeWindowCheckpointingITCase extends TestLogger {
+
+ private static final int MAX_MEM_STATE_SIZE = 20 * 1024 * 1024;
+ private static final int PARALLELISM = 4;
+
+ private TestingServer zkServer;
+
+ public MiniClusterResource miniClusterResource;
+
+ @ClassRule
+ public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+ @Rule
+ public TestName name = new TestName();
+
+ private AbstractStateBackend stateBackend;
+
+ @Parameterized.Parameter
+ public StateBackendEnum stateBackendEnum;
+
+ enum StateBackendEnum {
+ MEM, FILE, ROCKSDB_FULLY_ASYNC, ROCKSDB_INCREMENTAL, ROCKSDB_INCREMENTAL_ZK, MEM_ASYNC, FILE_ASYNC
+ }
+
+ @Parameterized.Parameters(name = "statebackend type ={0}")
+ public static Collection<StateBackendEnum> parameter() {
+ return Arrays.asList(StateBackendEnum.values());
+ }
+
+ protected StateBackendEnum getStateBackend() {
+ return this.stateBackendEnum;
+ }
+
+ protected final MiniClusterResource getMiniClusterResource() {
+ return new MiniClusterResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setConfiguration(getConfigurationSafe())
+ .setNumberTaskManagers(2)
+ .setNumberSlotsPerTaskManager(PARALLELISM / 2)
+ .build());
+ }
+
+ private Configuration getConfigurationSafe() {
+ try {
+ return getConfiguration();
+ } catch (Exception e) {
+ throw new AssertionError("Could not initialize test.", e);
+ }
+ }
+
+ private Configuration getConfiguration() throws Exception {
+
+ // print a message when starting a test method to avoid Travis' <tt>"Maven produced no
+ // output for xxx seconds."</tt> messages
+ System.out.println(
+ "Starting " + getClass().getCanonicalName() + "#" + name.getMethodName() + ".");
+
+ // Testing HA Scenario / ZKCompletedCheckpointStore with incremental checkpoints
+ StateBackendEnum stateBackendEnum = getStateBackend();
+ if (ROCKSDB_INCREMENTAL_ZK.equals(stateBackendEnum)) {
+ zkServer = new TestingServer();
+ zkServer.start();
+ }
+
+ Configuration config = createClusterConfig();
+
+ switch (stateBackendEnum) {
+ case MEM:
+ this.stateBackend = new MemoryStateBackend(MAX_MEM_STATE_SIZE, false);
+ break;
+ case FILE: {
+ String backups = tempFolder.newFolder().getAbsolutePath();
+ this.stateBackend = new FsStateBackend("file://" + backups, false);
+ break;
+ }
+ case MEM_ASYNC:
+ this.stateBackend = new MemoryStateBackend(MAX_MEM_STATE_SIZE, true);
+ break;
+ case FILE_ASYNC: {
+ String backups = tempFolder.newFolder().getAbsolutePath();
+ this.stateBackend = new FsStateBackend("file://" + backups, true);
+ break;
+ }
+ case ROCKSDB_FULLY_ASYNC: {
+ String rocksDb = tempFolder.newFolder().getAbsolutePath();
+ String backups = tempFolder.newFolder().getAbsolutePath();
+ RocksDBStateBackend rdb = new RocksDBStateBackend(new FsStateBackend("file://" + backups));
+ rdb.setDbStoragePath(rocksDb);
+ this.stateBackend = rdb;
+ break;
+ }
+ case ROCKSDB_INCREMENTAL:
+ case ROCKSDB_INCREMENTAL_ZK: {
+ String rocksDb = tempFolder.newFolder().getAbsolutePath();
+ String backups = tempFolder.newFolder().getAbsolutePath();
+ // we use the fs backend with small threshold here to test the behaviour with file
+ // references, not self contained byte handles
+ RocksDBStateBackend rdb =
+ new RocksDBStateBackend(
+ new FsStateBackend(
+ new Path("file://" + backups).toUri(), 16),
+ true);
+ rdb.setDbStoragePath(rocksDb);
+ this.stateBackend = rdb;
+ break;
+ }
+ default:
+ throw new IllegalStateException("No backend selected.");
+ }
+ return config;
+ }
+
+ protected Configuration createClusterConfig() throws IOException {
+ TemporaryFolder temporaryFolder = new TemporaryFolder();
+ temporaryFolder.create();
+ final File haDir = temporaryFolder.newFolder();
+
+ Configuration config = new Configuration();
+ config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "48m");
+ // the default network buffers size (10% of heap max =~ 150MB) seems to much for this test case
+ config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(80L << 20)); // 80 MB
+ config.setString(AkkaOptions.FRAMESIZE, String.valueOf(MAX_MEM_STATE_SIZE) + "b");
+
+ if (zkServer != null) {
+ config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
+ config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString());
+ config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, haDir.toURI().toString());
+ }
+ return config;
+ }
+
+ @Before
+ public void setupTestCluster() throws Exception {
+ miniClusterResource = getMiniClusterResource();
+ miniClusterResource.before();
+ }
+
+ @After
+ public void stopTestCluster() throws IOException {
+ if (miniClusterResource != null) {
+ miniClusterResource.after();
+ miniClusterResource = null;
+ }
+
+ if (zkServer != null) {
+ zkServer.stop();
+ zkServer = null;
+ }
+
+ //Prints a message when finishing a test method to avoid Travis' <tt>"Maven produced no output
+ // for xxx seconds."</tt> messages.
+ System.out.println(
+ "Finished " + getClass().getCanonicalName() + "#" + name.getMethodName() + ".");
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Test
+ public void testTumblingTimeWindow() {
+ final int numElementsPerKey = numElementsPerKey();
+ final int windowSize = windowSize();
+ final int numKeys = numKeys();
+ FailingSource.reset();
+
+ try {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(PARALLELISM);
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+ env.enableCheckpointing(100);
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+ env.getConfig().disableSysoutLogging();
+ env.setStateBackend(this.stateBackend);
+ env.getConfig().setUseSnapshotCompression(true);
+
+ env
+ .addSource(new FailingSource(numKeys, numElementsPerKey, numElementsPerKey / 3))
+ .rebalance()
+ .keyBy(0)
+ .timeWindow(Time.of(windowSize, MILLISECONDS))
+ .apply(new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() {
+
+ private boolean open = false;
+
+ @Override
+ public void open(Configuration parameters) {
+ assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks());
+ open = true;
+ }
+
+ @Override
+ public void apply(
+ Tuple tuple,
+ TimeWindow window,
+ Iterable<Tuple2<Long, IntType>> values,
+ Collector<Tuple4<Long, Long, Long, IntType>> out) {
+
+ // validate that the function has been opened properly
+ assertTrue(open);
+
+ int sum = 0;
+ long key = -1;
+
+ for (Tuple2<Long, IntType> value : values) {
+ sum += value.f1.value;
+ key = value.f0;
+ }
+ out.collect(new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum)));
+ }
+ })
+ .addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSize)).setParallelism(1);
+
+ tryExecute(env, "Tumbling Window Test");
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testTumblingTimeWindowWithKVStateMinMaxParallelism() {
+ doTestTumblingTimeWindowWithKVState(PARALLELISM);
+ }
+
+ @Test
+ public void testTumblingTimeWindowWithKVStateMaxMaxParallelism() {
+ doTestTumblingTimeWindowWithKVState(1 << 15);
+ }
+
+ public void doTestTumblingTimeWindowWithKVState(int maxParallelism) {
+ final int numElementsPerKey = numElementsPerKey();
+ final int windowSize = windowSize();
+ final int numKeys = numKeys();
+ FailingSource.reset();
+
+ try {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(PARALLELISM);
+ env.setMaxParallelism(maxParallelism);
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+ env.enableCheckpointing(100);
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+ env.getConfig().disableSysoutLogging();
+ env.setStateBackend(this.stateBackend);
+ env.getConfig().setUseSnapshotCompression(true);
+
+ env
+ .addSource(new FailingSource(numKeys, numElementsPerKey, numElementsPerKey / 3))
+ .rebalance()
+ .keyBy(0)
+ .timeWindow(Time.of(windowSize, MILLISECONDS))
+ .apply(new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() {
+
+ private boolean open = false;
+
+ private ValueState<Integer> count;
+
+ @Override
+ public void open(Configuration parameters) {
+ assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks());
+ open = true;
+ count = getRuntimeContext().getState(
+ new ValueStateDescriptor<>("count", Integer.class, 0));
+ }
+
+ @Override
+ public void apply(
+ Tuple tuple,
+ TimeWindow window,
+ Iterable<Tuple2<Long, IntType>> values,
+ Collector<Tuple4<Long, Long, Long, IntType>> out) throws Exception {
+
+ // the window count state starts with the key, so that we get
+ // different count results for each key
+ if (count.value() == 0) {
+ count.update(tuple.<Long>getField(0).intValue());
+ }
+
+ // validate that the function has been opened properly
+ assertTrue(open);
+
+ count.update(count.value() + 1);
+ out.collect(new Tuple4<>(tuple.<Long>getField(0), window.getStart(), window.getEnd(), new IntType(count.value())));
+ }
+ })
+ .addSink(new CountValidatingSink(numKeys, numElementsPerKey / windowSize)).setParallelism(1);
+
+ tryExecute(env, "Tumbling Window Test");
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testSlidingTimeWindow() {
+ final int numElementsPerKey = numElementsPerKey();
+ final int windowSize = windowSize();
+ final int windowSlide = windowSlide();
+ final int numKeys = numKeys();
+ FailingSource.reset();
+
+ try {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setMaxParallelism(2 * PARALLELISM);
+ env.setParallelism(PARALLELISM);
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+ env.enableCheckpointing(100);
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+ env.getConfig().disableSysoutLogging();
+ env.setStateBackend(this.stateBackend);
+ env.getConfig().setUseSnapshotCompression(true);
+
+ env
+ .addSource(new FailingSource(numKeys, numElementsPerKey, numElementsPerKey / 3))
+ .rebalance()
+ .keyBy(0)
+ .timeWindow(Time.of(windowSize, MILLISECONDS), Time.of(windowSlide, MILLISECONDS))
+ .apply(new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() {
+
+ private boolean open = false;
+
+ @Override
+ public void open(Configuration parameters) {
+ assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks());
+ open = true;
+ }
+
+ @Override
+ public void apply(
+ Tuple tuple,
+ TimeWindow window,
+ Iterable<Tuple2<Long, IntType>> values,
+ Collector<Tuple4<Long, Long, Long, IntType>> out) {
+
+ // validate that the function has been opened properly
+ assertTrue(open);
+
+ int sum = 0;
+ long key = -1;
+
+ for (Tuple2<Long, IntType> value : values) {
+ sum += value.f1.value;
+ key = value.f0;
+ }
+ out.collect(new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum)));
+ }
+ })
+ .addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSlide)).setParallelism(1);
+
+ tryExecute(env, "Tumbling Window Test");
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testPreAggregatedTumblingTimeWindow() {
+ final int numElementsPerKey = numElementsPerKey();
+ final int windowSize = windowSize();
+ final int numKeys = numKeys();
+ FailingSource.reset();
+
+ try {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(PARALLELISM);
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+ env.enableCheckpointing(100);
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+ env.getConfig().disableSysoutLogging();
+ env.setStateBackend(this.stateBackend);
+ env.getConfig().setUseSnapshotCompression(true);
+
+ env
+ .addSource(new FailingSource(numKeys, numElementsPerKey, numElementsPerKey / 3))
+ .rebalance()
+ .keyBy(0)
+ .timeWindow(Time.of(windowSize, MILLISECONDS))
+ .reduce(
+ new ReduceFunction<Tuple2<Long, IntType>>() {
+
+ @Override
+ public Tuple2<Long, IntType> reduce(
+ Tuple2<Long, IntType> a,
+ Tuple2<Long, IntType> b) {
+ return new Tuple2<>(a.f0, new IntType(a.f1.value + b.f1.value));
+ }
+ },
+ new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() {
+
+ private boolean open = false;
+
+ @Override
+ public void open(Configuration parameters) {
+ assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks());
+ open = true;
+ }
+
+ @Override
+ public void apply(
+ Tuple tuple,
+ TimeWindow window,
+ Iterable<Tuple2<Long, IntType>> input,
+ Collector<Tuple4<Long, Long, Long, IntType>> out) {
+
+ // validate that the function has been opened properly
+ assertTrue(open);
+
+ for (Tuple2<Long, IntType> in: input) {
+ out.collect(new Tuple4<>(in.f0,
+ window.getStart(),
+ window.getEnd(),
+ in.f1));
+ }
+ }
+ })
+ .addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSize)).setParallelism(1);
+
+ tryExecute(env, "Tumbling Window Test");
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testPreAggregatedSlidingTimeWindow() {
+ final int numElementsPerKey = numElementsPerKey();
+ final int windowSize = windowSize();
+ final int windowSlide = windowSlide();
+ final int numKeys = numKeys();
+ FailingSource.reset();
+
+ try {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(PARALLELISM);
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+ env.enableCheckpointing(100);
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+ env.getConfig().disableSysoutLogging();
+ env.setStateBackend(this.stateBackend);
+ env.getConfig().setUseSnapshotCompression(true);
+
+ env
+ .addSource(new FailingSource(numKeys, numElementsPerKey, numElementsPerKey / 3))
+ .rebalance()
+ .keyBy(0)
+ .timeWindow(Time.of(windowSize, MILLISECONDS), Time.of(windowSlide, MILLISECONDS))
+ .reduce(
+ new ReduceFunction<Tuple2<Long, IntType>>() {
+
+ @Override
+ public Tuple2<Long, IntType> reduce(
+ Tuple2<Long, IntType> a,
+ Tuple2<Long, IntType> b) {
+
+ // validate that the function has been opened properly
+ return new Tuple2<>(a.f0, new IntType(a.f1.value + b.f1.value));
+ }
+ },
+ new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() {
+
+ private boolean open = false;
+
+ @Override
+ public void open(Configuration parameters) {
+ assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks());
+ open = true;
+ }
+
+ @Override
+ public void apply(
+ Tuple tuple,
+ TimeWindow window,
+ Iterable<Tuple2<Long, IntType>> input,
+ Collector<Tuple4<Long, Long, Long, IntType>> out) {
+
+ // validate that the function has been opened properly
+ assertTrue(open);
+
+ for (Tuple2<Long, IntType> in: input) {
+ out.collect(new Tuple4<>(in.f0,
+ window.getStart(),
+ window.getEnd(),
+ in.f1));
+ }
+ }
+ })
+ .addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSlide)).setParallelism(1);
+
+ tryExecute(env, "Tumbling Window Test");
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ private static class FailingSource extends RichSourceFunction<Tuple2<Long, IntType>>
+ implements ListCheckpointed<Integer>, CheckpointListener {
+ private static volatile boolean failedBefore = false;
+
+ private final int numKeys;
+ private final int numElementsToEmit;
+ private final int failureAfterNumElements;
+
+ private volatile int numElementsEmitted;
+ private volatile int numSuccessfulCheckpoints;
+ private volatile boolean running = true;
+
+ private FailingSource(int numKeys, int numElementsToEmitPerKey, int failureAfterNumElements) {
+ this.numKeys = numKeys;
+ this.numElementsToEmit = numElementsToEmitPerKey;
+ this.failureAfterNumElements = failureAfterNumElements;
+ }
+
+ @Override
+ public void open(Configuration parameters) {
+ // non-parallel source
+ assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
+ }
+
+ @Override
+ public void run(SourceContext<Tuple2<Long, IntType>> ctx) throws Exception {
+ // we loop longer than we have elements, to permit delayed checkpoints
+ // to still cause a failure
+ while (running) {
+
+ if (!failedBefore) {
+ // delay a bit, if we have not failed before
+ Thread.sleep(1);
+ if (numSuccessfulCheckpoints >= 2 && numElementsEmitted >= failureAfterNumElements) {
+ // cause a failure if we have not failed before and have reached
+ // enough completed checkpoints and elements
+ failedBefore = true;
+ throw new Exception("Artificial Failure");
+ }
+ }
+
+ if (numElementsEmitted < numElementsToEmit &&
+ (failedBefore || numElementsEmitted <= failureAfterNumElements)) {
+ // the function failed before, or we are in the elements before the failure
+ synchronized (ctx.getCheckpointLock()) {
+ int next = numElementsEmitted++;
+ for (long i = 0; i < numKeys; i++) {
+ ctx.collectWithTimestamp(new Tuple2<Long, IntType>(i, new IntType(next)), next);
+ }
+ ctx.emitWatermark(new Watermark(next));
+ }
+ }
+ else {
+
+ // if our work is done, delay a bit to prevent busy waiting
+ Thread.sleep(1);
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) {
+ numSuccessfulCheckpoints++;
+ }
+
+ @Override
+ public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
+ return Collections.singletonList(this.numElementsEmitted);
+ }
+
+ @Override
+ public void restoreState(List<Integer> state) throws Exception {
+ if (state.isEmpty() || state.size() > 1) {
+ throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+ }
+ this.numElementsEmitted = state.get(0);
+ }
+
+ public static void reset() {
+ failedBefore = false;
+ }
+ }
+
+ private static class ValidatingSink extends RichSinkFunction<Tuple4<Long, Long, Long, IntType>>
+ implements ListCheckpointed<HashMap<Long, Integer>> {
+
+ private final HashMap<Long, Integer> windowCounts = new HashMap<>();
+
+ private final int numKeys;
+ private final int numWindowsExpected;
+
+ private ValidatingSink(int numKeys, int numWindowsExpected) {
+ this.numKeys = numKeys;
+ this.numWindowsExpected = numWindowsExpected;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ // this sink can only work with DOP 1
+ assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
+
+ // it can happen that a checkpoint happens when the complete success state is
+ // already set. In that case we restart with the final state and would never
+ // finish because no more elements arrive.
+ if (windowCounts.size() == numKeys) {
+ boolean seenAll = true;
+ for (Integer windowCount: windowCounts.values()) {
+ if (windowCount != numWindowsExpected) {
+ seenAll = false;
+ break;
+ }
+ }
+ if (seenAll) {
+ throw new SuccessException();
+ }
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ boolean seenAll = true;
+ if (windowCounts.size() == numKeys) {
+ for (Integer windowCount: windowCounts.values()) {
+ if (windowCount < numWindowsExpected) {
+ seenAll = false;
+ break;
+ }
+ }
+ }
+ assertTrue("The sink must see all expected windows.", seenAll);
+ }
+
+ @Override
+ public void invoke(Tuple4<Long, Long, Long, IntType> value) throws Exception {
+
+ // verify the contents of that window, Tuple4.f1 and .f2 are the window start/end
+ // the sum should be "sum (start .. end-1)"
+
+ int expectedSum = 0;
+ for (long i = value.f1; i < value.f2; i++) {
+ // only sum up positive vals, to filter out the negative start of the
+ // first sliding windows
+ if (i > 0) {
+ expectedSum += i;
+ }
+ }
+
+ assertEquals("Window start: " + value.f1 + " end: " + value.f2, expectedSum, value.f3.value);
+
+ Integer curr = windowCounts.get(value.f0);
+ if (curr != null) {
+ windowCounts.put(value.f0, curr + 1);
+ }
+ else {
+ windowCounts.put(value.f0, 1);
+ }
+
+ if (windowCounts.size() == numKeys) {
+ boolean seenAll = true;
+ for (Integer windowCount: windowCounts.values()) {
+ if (windowCount < numWindowsExpected) {
+ seenAll = false;
+ break;
+ } else if (windowCount > numWindowsExpected) {
+ fail("Window count to high: " + windowCount);
+ }
+ }
+
+ if (seenAll) {
+ // exit
+ throw new SuccessException();
+ }
+
+ }
+ }
+
+ @Override
+ public List<HashMap<Long, Integer>> snapshotState(long checkpointId, long timestamp) throws Exception {
+ return Collections.singletonList(this.windowCounts);
+ }
+
+ @Override
+ public void restoreState(List<HashMap<Long, Integer>> state) throws Exception {
+ if (state.isEmpty() || state.size() > 1) {
+ throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+ }
+ windowCounts.putAll(state.get(0));
+ }
+ }
+
+ // Sink for validating the stateful window counts
+ private static class CountValidatingSink extends RichSinkFunction<Tuple4<Long, Long, Long, IntType>>
+ implements ListCheckpointed<HashMap<Long, Integer>> {
+
+ private final HashMap<Long, Integer> windowCounts = new HashMap<>();
+
+ private final int numKeys;
+ private final int numWindowsExpected;
+
+ private CountValidatingSink(int numKeys, int numWindowsExpected) {
+ this.numKeys = numKeys;
+ this.numWindowsExpected = numWindowsExpected;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ // this sink can only work with DOP 1
+ assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
+ }
+
+ @Override
+ public void close() throws Exception {
+ boolean seenAll = true;
+ if (windowCounts.size() == numKeys) {
+ for (Integer windowCount: windowCounts.values()) {
+ if (windowCount < numWindowsExpected) {
+ seenAll = false;
+ break;
+ }
+ }
+ }
+ assertTrue("The source must see all expected windows.", seenAll);
+ }
+
+ @Override
+ public void invoke(Tuple4<Long, Long, Long, IntType> value) throws Exception {
+
+ Integer curr = windowCounts.get(value.f0);
+ if (curr != null) {
+ windowCounts.put(value.f0, curr + 1);
+ }
+ else {
+ windowCounts.put(value.f0, 1);
+ }
+
+ // verify the contents of that window, the contents should be:
+ // (key + num windows so far)
+
+ assertEquals("Window counts don't match for key " + value.f0 + ".", value.f0.intValue() + windowCounts.get(value.f0), value.f3.value);
+
+ boolean seenAll = true;
+ if (windowCounts.size() == numKeys) {
+ for (Integer windowCount: windowCounts.values()) {
+ if (windowCount < numWindowsExpected) {
+ seenAll = false;
+ break;
+ } else if (windowCount > numWindowsExpected) {
+ fail("Window count to high: " + windowCount);
+ }
+ }
+
+ if (seenAll) {
+ // exit
+ throw new SuccessException();
+ }
+
+ }
+ }
+
+ @Override
+ public List<HashMap<Long, Integer>> snapshotState(long checkpointId, long timestamp) throws Exception {
+ return Collections.singletonList(this.windowCounts);
+ }
+
+ @Override
+ public void restoreState(List<HashMap<Long, Integer>> state) throws Exception {
+ if (state.isEmpty() || state.size() > 1) {
+ throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+ }
+ this.windowCounts.putAll(state.get(0));
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ private static class IntType {
+
+ public int value;
+
+ public IntType() {}
+
+ public IntType(int value) {
+ this.value = value;
+ }
+ }
+
+ private int numElementsPerKey() {
+ switch (this.stateBackendEnum) {
+ case ROCKSDB_FULLY_ASYNC:
+ case ROCKSDB_INCREMENTAL:
+ case ROCKSDB_INCREMENTAL_ZK:
+ return 3000;
+ default:
+ return 300;
+ }
+ }
+
+ private int windowSize() {
+ switch (this.stateBackendEnum) {
+ case ROCKSDB_FULLY_ASYNC:
+ case ROCKSDB_INCREMENTAL:
+ case ROCKSDB_INCREMENTAL_ZK:
+ return 1000;
+ default:
+ return 100;
+ }
+ }
+
+ private int windowSlide() {
+ return 100;
+ }
+
+ private int numKeys() {
+ switch (this.stateBackendEnum) {
+ case ROCKSDB_FULLY_ASYNC:
+ case ROCKSDB_INCREMENTAL:
+ case ROCKSDB_INCREMENTAL_ZK:
+ return 100;
+ default:
+ return 20;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fc49801d/flink-tests/src/test/java/org/apache/flink/test/checkpointing/FileBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/FileBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/FileBackendEventTimeWindowCheckpointingITCase.java
deleted file mode 100644
index eab6153..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/FileBackendEventTimeWindowCheckpointingITCase.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.checkpointing;
-
-/**
- * Integration tests for file backend.
- */
-public class FileBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase {
- @Override
- protected StateBackendEnum getStateBackend() {
- return StateBackendEnum.FILE;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/fc49801d/flink-tests/src/test/java/org/apache/flink/test/checkpointing/HAIncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/HAIncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/HAIncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
deleted file mode 100644
index ed43ad6..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/HAIncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.checkpointing;
-
-/**
- * Integration tests for incremental RocksDB backend.
- */
-public class HAIncrementalRocksDbBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase {
-
- @Override
- protected StateBackendEnum getStateBackend() {
- return StateBackendEnum.ROCKSDB_INCREMENTAL_ZK;
- }
-
- @Override
- protected int numElementsPerKey() {
- return 3000;
- }
-
- @Override
- protected int windowSize() {
- return 1000;
- }
-
- @Override
- protected int windowSlide() {
- return 100;
- }
-
- @Override
- protected int numKeys() {
- return 100;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/fc49801d/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
deleted file mode 100644
index 1276a00..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.checkpointing;
-
-/**
- * Integration tests for incremental RocksDB backend.
- */
-public class IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase {
-
- @Override
- protected StateBackendEnum getStateBackend() {
- return StateBackendEnum.ROCKSDB_INCREMENTAL;
- }
-
- @Override
- protected int numElementsPerKey() {
- return 3000;
- }
-
- @Override
- protected int windowSize() {
- return 1000;
- }
-
- @Override
- protected int windowSlide() {
- return 100;
- }
-
- @Override
- protected int numKeys() {
- return 100;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/fc49801d/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryHeapITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryHeapITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryHeapITCase.java
deleted file mode 100644
index 6749366..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryHeapITCase.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.checkpointing;
-
-import static org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum.FILE_ASYNC;
-
-/**
- * Tests file-based local recovery with the HeapBackend.
- */
-public class LocalRecoveryHeapITCase extends AbstractLocalRecoveryITCase {
- public LocalRecoveryHeapITCase() {
- super(FILE_ASYNC, true);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/fc49801d/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryITCase.java
new file mode 100644
index 0000000..5374765
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryITCase.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+
+import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum;
+import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.FILE_ASYNC;
+import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_FULLY_ASYNC;
+import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_INCREMENTAL_ZK;
+
+/**
+ * This test delegates to instances of {@link EventTimeWindowCheckpointingITCase} that have been reconfigured
+ * to use local recovery.
+ *
+ * <p>TODO: This class must be refactored to properly extend {@link EventTimeWindowCheckpointingITCase}.
+ */
+@RunWith(Parameterized.class)
+public class LocalRecoveryITCase extends TestLogger {
+
+ private final boolean localRecoveryEnabled = true;
+
+ @Rule
+ public TestName testName = new TestName();
+
+ @Parameterized.Parameter
+ public StateBackendEnum backendEnum;
+
+ @Parameterized.Parameters(name = "statebackend type ={0}")
+ public static Collection<StateBackendEnum> parameter() {
+ return Arrays.asList(ROCKSDB_FULLY_ASYNC, ROCKSDB_INCREMENTAL_ZK, FILE_ASYNC);
+ }
+
+ @Test
+ public final void executeTest() throws Exception {
+ EventTimeWindowCheckpointingITCase.tempFolder.create();
+ EventTimeWindowCheckpointingITCase windowChkITCase =
+ new EventTimeWindowCheckpointingITCase() {
+
+ @Override
+ protected StateBackendEnum getStateBackend() {
+ return backendEnum;
+ }
+
+ @Override
+ protected Configuration createClusterConfig() throws IOException {
+ Configuration config = super.createClusterConfig();
+
+ config.setBoolean(
+ CheckpointingOptions.LOCAL_RECOVERY,
+ localRecoveryEnabled);
+
+ return config;
+ }
+ };
+
+ executeTest(windowChkITCase);
+ }
+
+ private void executeTest(EventTimeWindowCheckpointingITCase delegate) throws Exception {
+ delegate.name = testName;
+ try {
+ delegate.setupTestCluster();
+ try {
+ delegate.testTumblingTimeWindow();
+ delegate.stopTestCluster();
+ } catch (Exception e) {
+ delegate.stopTestCluster();
+ }
+
+ delegate.setupTestCluster();
+ try {
+ delegate.testSlidingTimeWindow();
+ delegate.stopTestCluster();
+ } catch (Exception e) {
+ delegate.stopTestCluster();
+ }
+ } finally {
+ delegate.tempFolder.delete();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fc49801d/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBFullITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBFullITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBFullITCase.java
deleted file mode 100644
index 2d12ae2..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBFullITCase.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.checkpointing;
-
-import static org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_FULLY_ASYNC;
-
-/**
- * Tests file-based local recovery with the RocksDB state-backend.
- */
-public class LocalRecoveryRocksDBFullITCase extends AbstractLocalRecoveryITCase {
- public LocalRecoveryRocksDBFullITCase() {
- super(ROCKSDB_FULLY_ASYNC, true);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/fc49801d/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBIncrementalITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBIncrementalITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBIncrementalITCase.java
deleted file mode 100644
index 718d4a3..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBIncrementalITCase.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.checkpointing;
-
-import static org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_INCREMENTAL_ZK;
-
-/**
- * Tests file-based local recovery with the RocksDB state-backend and incremental checkpointing enabled.
- */
-public class LocalRecoveryRocksDBIncrementalITCase extends AbstractLocalRecoveryITCase {
- public LocalRecoveryRocksDBIncrementalITCase() {
- super(ROCKSDB_INCREMENTAL_ZK, true);
- }
-}