You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/12/04 17:49:16 UTC
flink git commit: [FLINK-5016] [checkpointing] Split
EventTimeWindowCheckpointingITCase
Repository: flink
Updated Branches:
refs/heads/master 7fbd757dd -> e288617f9
[FLINK-5016] [checkpointing] Split EventTimeWindowCheckpointingITCase
Split this EventTimeWindowCheckpointingITCase up into multiple tests
in order to not run into the no output to stdout CI limit (currently
set to 5 minutes).
This closes #2933.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e288617f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e288617f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e288617f
Branch: refs/heads/master
Commit: e288617f9eca260f2fac53df48862335247199ed
Parents: 7fbd757
Author: Ufuk Celebi <uc...@apache.org>
Authored: Sun Dec 4 16:06:16 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Sun Dec 4 18:48:53 2016 +0100
----------------------------------------------------------------------
...tractEventTimeWindowCheckpointingITCase.java | 782 ++++++++++++++++++
.../EventTimeAllWindowCheckpointingITCase.java | 2 +-
.../EventTimeWindowCheckpointingITCase.java | 800 -------------------
...ckendEventTimeWindowCheckpointingITCase.java | 26 +
...ckendEventTimeWindowCheckpointingITCase.java | 26 +
...ckendEventTimeWindowCheckpointingITCase.java | 26 +
6 files changed, 861 insertions(+), 801 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e288617f/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
new file mode 100644
index 0000000..583e42f
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
@@ -0,0 +1,782 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.test.util.SuccessException;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.TestLogger;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.flink.test.util.TestUtils.tryExecute;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * This verifies that checkpointing works correctly with event time windows. This is more
+ * strict than {@link WindowCheckpointingITCase} because for event-time the contents
+ * of the emitted windows are deterministic.
+ *
+ * <p>Split into multiple test classes in order to decrease the runtime per backend
+ * and not run into CI infrastructure limits like no std output being emitted for
+ * I/O heavy variants.
+ */
+@SuppressWarnings("serial")
+public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLogger {
+
+ private static final int MAX_MEM_STATE_SIZE = 10 * 1024 * 1024;
+ private static final int PARALLELISM = 4;
+
+ private static LocalFlinkMiniCluster cluster;
+
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ private StateBackendEnum stateBackendEnum;
+ private AbstractStateBackend stateBackend;
+
+ AbstractEventTimeWindowCheckpointingITCase(StateBackendEnum stateBackendEnum) {
+ this.stateBackendEnum = stateBackendEnum;
+ }
+
+ enum StateBackendEnum {
+ MEM, FILE, ROCKSDB_FULLY_ASYNC
+ }
+
+ @BeforeClass
+ public static void startTestCluster() {
+ Configuration config = new Configuration();
+ config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
+ config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM / 2);
+ config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 48);
+
+ cluster = new LocalFlinkMiniCluster(config, false);
+ cluster.start();
+ }
+
+ @AfterClass
+ public static void stopTestCluster() {
+ if (cluster != null) {
+ cluster.stop();
+ }
+ }
+
+ @Before
+ public void initStateBackend() throws IOException {
+ switch (stateBackendEnum) {
+ case MEM:
+ this.stateBackend = new MemoryStateBackend(MAX_MEM_STATE_SIZE);
+ break;
+ case FILE: {
+ String backups = tempFolder.newFolder().getAbsolutePath();
+ this.stateBackend = new FsStateBackend("file://" + backups);
+ break;
+ }
+ case ROCKSDB_FULLY_ASYNC: {
+ String rocksDb = tempFolder.newFolder().getAbsolutePath();
+ RocksDBStateBackend rdb = new RocksDBStateBackend(new MemoryStateBackend(MAX_MEM_STATE_SIZE));
+ rdb.setDbStoragePath(rocksDb);
+ this.stateBackend = rdb;
+ break;
+ }
+
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Test
+ public void testTumblingTimeWindow() {
+ final int NUM_ELEMENTS_PER_KEY = 3000;
+ final int WINDOW_SIZE = 100;
+ final int NUM_KEYS = 100;
+ FailingSource.reset();
+
+ try {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
+ "localhost", cluster.getLeaderRPCPort());
+
+ env.setParallelism(PARALLELISM);
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+ env.enableCheckpointing(100);
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+ env.getConfig().disableSysoutLogging();
+ env.setStateBackend(this.stateBackend);
+
+ env
+ .addSource(new FailingSource(NUM_KEYS, NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3))
+ .rebalance()
+ .keyBy(0)
+ .timeWindow(Time.of(WINDOW_SIZE, MILLISECONDS))
+ .apply(new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() {
+
+ private boolean open = false;
+
+ @Override
+ public void open(Configuration parameters) {
+ assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks());
+ open = true;
+ }
+
+ @Override
+ public void apply(
+ Tuple tuple,
+ TimeWindow window,
+ Iterable<Tuple2<Long, IntType>> values,
+ Collector<Tuple4<Long, Long, Long, IntType>> out) {
+
+ // validate that the function has been opened properly
+ assertTrue(open);
+
+ int sum = 0;
+ long key = -1;
+
+ for (Tuple2<Long, IntType> value : values) {
+ sum += value.f1.value;
+ key = value.f0;
+ }
+ out.collect(new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum)));
+ }
+ })
+ .addSink(new ValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SIZE)).setParallelism(1);
+
+
+ tryExecute(env, "Tumbling Window Test");
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testTumblingTimeWindowWithKVStateMinMaxParallelism() {
+ doTestTumblingTimeWindowWithKVState(PARALLELISM);
+ }
+
+ @Test
+ public void testTumblingTimeWindowWithKVStateMaxMaxParallelism() {
+ doTestTumblingTimeWindowWithKVState(1 << 15);
+ }
+
+ public void doTestTumblingTimeWindowWithKVState(int maxParallelism) {
+ final int NUM_ELEMENTS_PER_KEY = 3000;
+ final int WINDOW_SIZE = 100;
+ final int NUM_KEYS = 100;
+ FailingSource.reset();
+
+ try {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
+ "localhost", cluster.getLeaderRPCPort());
+
+ env.setParallelism(PARALLELISM);
+ env.setMaxParallelism(maxParallelism);
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+ env.enableCheckpointing(100);
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+ env.getConfig().disableSysoutLogging();
+ env.setStateBackend(this.stateBackend);
+
+ env
+ .addSource(new FailingSource(NUM_KEYS, NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3))
+ .rebalance()
+ .keyBy(0)
+ .timeWindow(Time.of(WINDOW_SIZE, MILLISECONDS))
+ .apply(new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() {
+
+ private boolean open = false;
+
+ private ValueState<Integer> count;
+
+ @Override
+ public void open(Configuration parameters) {
+ assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks());
+ open = true;
+ count = getRuntimeContext().getState(
+ new ValueStateDescriptor<>("count", Integer.class, 0));
+ }
+
+ @Override
+ public void apply(
+ Tuple tuple,
+ TimeWindow window,
+ Iterable<Tuple2<Long, IntType>> values,
+ Collector<Tuple4<Long, Long, Long, IntType>> out) throws Exception {
+
+ // the window count state starts with the key, so that we get
+ // different count results for each key
+ if (count.value() == 0) {
+ count.update(tuple.<Long>getField(0).intValue());
+ }
+
+ // validate that the function has been opened properly
+ assertTrue(open);
+
+ count.update(count.value() + 1);
+ out.collect(new Tuple4<>(tuple.<Long>getField(0), window.getStart(), window.getEnd(), new IntType(count.value())));
+ }
+ })
+ .addSink(new CountValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SIZE)).setParallelism(1);
+
+
+ tryExecute(env, "Tumbling Window Test");
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testSlidingTimeWindow() {
+ final int NUM_ELEMENTS_PER_KEY = 3000;
+ final int WINDOW_SIZE = 1000;
+ final int WINDOW_SLIDE = 100;
+ final int NUM_KEYS = 100;
+ FailingSource.reset();
+
+ try {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
+ "localhost", cluster.getLeaderRPCPort());
+
+ env.setMaxParallelism(2 * PARALLELISM);
+ env.setParallelism(PARALLELISM);
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+ env.enableCheckpointing(100);
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+ env.getConfig().disableSysoutLogging();
+ env.setStateBackend(this.stateBackend);
+
+ env
+ .addSource(new FailingSource(NUM_KEYS, NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3))
+ .rebalance()
+ .keyBy(0)
+ .timeWindow(Time.of(WINDOW_SIZE, MILLISECONDS), Time.of(WINDOW_SLIDE, MILLISECONDS))
+ .apply(new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() {
+
+ private boolean open = false;
+
+ @Override
+ public void open(Configuration parameters) {
+ assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks());
+ open = true;
+ }
+
+ @Override
+ public void apply(
+ Tuple tuple,
+ TimeWindow window,
+ Iterable<Tuple2<Long, IntType>> values,
+ Collector<Tuple4<Long, Long, Long, IntType>> out) {
+
+ // validate that the function has been opened properly
+ assertTrue(open);
+
+ int sum = 0;
+ long key = -1;
+
+ for (Tuple2<Long, IntType> value : values) {
+ sum += value.f1.value;
+ key = value.f0;
+ }
+ out.collect(new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum)));
+ }
+ })
+ .addSink(new ValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SLIDE)).setParallelism(1);
+
+
+ tryExecute(env, "Tumbling Window Test");
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testPreAggregatedTumblingTimeWindow() {
+ final int NUM_ELEMENTS_PER_KEY = 3000;
+ final int WINDOW_SIZE = 100;
+ final int NUM_KEYS = 100;
+ FailingSource.reset();
+
+ try {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
+ "localhost", cluster.getLeaderRPCPort());
+
+ env.setParallelism(PARALLELISM);
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+ env.enableCheckpointing(100);
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+ env.getConfig().disableSysoutLogging();
+ env.setStateBackend(this.stateBackend);
+
+ env
+ .addSource(new FailingSource(NUM_KEYS, NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3))
+ .rebalance()
+ .keyBy(0)
+ .timeWindow(Time.of(WINDOW_SIZE, MILLISECONDS))
+ .reduce(
+ new ReduceFunction<Tuple2<Long, IntType>>() {
+
+ @Override
+ public Tuple2<Long, IntType> reduce(
+ Tuple2<Long, IntType> a,
+ Tuple2<Long, IntType> b) {
+ return new Tuple2<>(a.f0, new IntType(a.f1.value + b.f1.value));
+ }
+ },
+ new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() {
+
+ private boolean open = false;
+
+ @Override
+ public void open(Configuration parameters) {
+ assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks());
+ open = true;
+ }
+
+ @Override
+ public void apply(
+ Tuple tuple,
+ TimeWindow window,
+ Iterable<Tuple2<Long, IntType>> input,
+ Collector<Tuple4<Long, Long, Long, IntType>> out) {
+
+ // validate that the function has been opened properly
+ assertTrue(open);
+
+ for (Tuple2<Long, IntType> in: input) {
+ out.collect(new Tuple4<>(in.f0,
+ window.getStart(),
+ window.getEnd(),
+ in.f1));
+ }
+ }
+ })
+ .addSink(new ValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SIZE)).setParallelism(1);
+
+
+ tryExecute(env, "Tumbling Window Test");
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testPreAggregatedSlidingTimeWindow() {
+ final int NUM_ELEMENTS_PER_KEY = 3000;
+ final int WINDOW_SIZE = 1000;
+ final int WINDOW_SLIDE = 100;
+ final int NUM_KEYS = 100;
+ FailingSource.reset();
+
+ try {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
+ "localhost", cluster.getLeaderRPCPort());
+
+ env.setParallelism(PARALLELISM);
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+ env.enableCheckpointing(100);
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+ env.getConfig().disableSysoutLogging();
+ env.setStateBackend(this.stateBackend);
+
+ env
+ .addSource(new FailingSource(NUM_KEYS, NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3))
+ .rebalance()
+ .keyBy(0)
+ .timeWindow(Time.of(WINDOW_SIZE, MILLISECONDS), Time.of(WINDOW_SLIDE, MILLISECONDS))
+ .reduce(
+ new ReduceFunction<Tuple2<Long, IntType>>() {
+
+ @Override
+ public Tuple2<Long, IntType> reduce(
+ Tuple2<Long, IntType> a,
+ Tuple2<Long, IntType> b) {
+
+ // validate that the function has been opened properly
+ return new Tuple2<>(a.f0, new IntType(a.f1.value + b.f1.value));
+ }
+ },
+ new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() {
+
+ private boolean open = false;
+
+ @Override
+ public void open(Configuration parameters) {
+ assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks());
+ open = true;
+ }
+
+ @Override
+ public void apply(
+ Tuple tuple,
+ TimeWindow window,
+ Iterable<Tuple2<Long, IntType>> input,
+ Collector<Tuple4<Long, Long, Long, IntType>> out) {
+
+ // validate that the function has been opened properly
+ assertTrue(open);
+
+ for (Tuple2<Long, IntType> in: input) {
+ out.collect(new Tuple4<>(in.f0,
+ window.getStart(),
+ window.getEnd(),
+ in.f1));
+ }
+ }
+ })
+ .addSink(new ValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SLIDE)).setParallelism(1);
+
+
+ tryExecute(env, "Tumbling Window Test");
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ private static class FailingSource extends RichSourceFunction<Tuple2<Long, IntType>>
+ implements Checkpointed<Integer>, CheckpointListener
+ {
+ private static volatile boolean failedBefore = false;
+
+ private final int numKeys;
+ private final int numElementsToEmit;
+ private final int failureAfterNumElements;
+
+ private volatile int numElementsEmitted;
+ private volatile int numSuccessfulCheckpoints;
+ private volatile boolean running = true;
+
+ private FailingSource(int numKeys, int numElementsToEmitPerKey, int failureAfterNumElements) {
+ this.numKeys = numKeys;
+ this.numElementsToEmit = numElementsToEmitPerKey;
+ this.failureAfterNumElements = failureAfterNumElements;
+ }
+
+ @Override
+ public void open(Configuration parameters) {
+ // non-parallel source
+ assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
+ }
+
+ @Override
+ public void run(SourceContext<Tuple2<Long, IntType>> ctx) throws Exception {
+ // we loop longer than we have elements, to permit delayed checkpoints
+ // to still cause a failure
+ while (running) {
+
+ if (!failedBefore) {
+ // delay a bit, if we have not failed before
+ Thread.sleep(1);
+ if (numSuccessfulCheckpoints >= 2 && numElementsEmitted >= failureAfterNumElements) {
+ // cause a failure if we have not failed before and have reached
+ // enough completed checkpoints and elements
+ failedBefore = true;
+ throw new Exception("Artificial Failure");
+ }
+ }
+
+ if (numElementsEmitted < numElementsToEmit &&
+ (failedBefore || numElementsEmitted <= failureAfterNumElements))
+ {
+ // the function failed before, or we are in the elements before the failure
+ synchronized (ctx.getCheckpointLock()) {
+ int next = numElementsEmitted++;
+ for (long i = 0; i < numKeys; i++) {
+ ctx.collectWithTimestamp(new Tuple2<Long, IntType>(i, new IntType(next)), next);
+ }
+ ctx.emitWatermark(new Watermark(next));
+ }
+ }
+ else {
+
+ // if our work is done, delay a bit to prevent busy waiting
+ Thread.sleep(1);
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) {
+ numSuccessfulCheckpoints++;
+ }
+
+ @Override
+ public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
+ return numElementsEmitted;
+ }
+
+ @Override
+ public void restoreState(Integer state) {
+ numElementsEmitted = state;
+ }
+
+ public static void reset() {
+ failedBefore = false;
+ }
+ }
+
+ private static class ValidatingSink extends RichSinkFunction<Tuple4<Long, Long, Long, IntType>>
+ implements Checkpointed<HashMap<Long, Integer>> {
+
+ private final HashMap<Long, Integer> windowCounts = new HashMap<>();
+
+ private final int numKeys;
+ private final int numWindowsExpected;
+
+ private ValidatingSink(int numKeys, int numWindowsExpected) {
+ this.numKeys = numKeys;
+ this.numWindowsExpected = numWindowsExpected;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ // this sink can only work with DOP 1
+ assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
+
+ // it can happen that a checkpoint happens when the complete success state is
+ // already set. In that case we restart with the final state and would never
+ // finish because no more elements arrive.
+ if (windowCounts.size() == numKeys) {
+ boolean seenAll = true;
+ for (Integer windowCount: windowCounts.values()) {
+ if (windowCount != numWindowsExpected) {
+ seenAll = false;
+ break;
+ }
+ }
+ if (seenAll) {
+ throw new SuccessException();
+ }
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ boolean seenAll = true;
+ if (windowCounts.size() == numKeys) {
+ for (Integer windowCount: windowCounts.values()) {
+ if (windowCount < numWindowsExpected) {
+ seenAll = false;
+ break;
+ }
+ }
+ }
+ assertTrue("The sink must see all expected windows.", seenAll);
+ }
+
+ @Override
+ public void invoke(Tuple4<Long, Long, Long, IntType> value) throws Exception {
+
+ // verify the contents of that window, Tuple4.f1 and .f2 are the window start/end
+ // the sum should be "sum (start .. end-1)"
+
+ int expectedSum = 0;
+ for (long i = value.f1; i < value.f2; i++) {
+ // only sum up positive vals, to filter out the negative start of the
+ // first sliding windows
+ if (i > 0) {
+ expectedSum += i;
+ }
+ }
+
+ assertEquals("Window start: " + value.f1 + " end: " + value.f2, expectedSum, value.f3.value);
+
+
+ Integer curr = windowCounts.get(value.f0);
+ if (curr != null) {
+ windowCounts.put(value.f0, curr + 1);
+ }
+ else {
+ windowCounts.put(value.f0, 1);
+ }
+
+ if (windowCounts.size() == numKeys) {
+ boolean seenAll = true;
+ for (Integer windowCount: windowCounts.values()) {
+ if (windowCount < numWindowsExpected) {
+ seenAll = false;
+ break;
+ } else if (windowCount > numWindowsExpected) {
+ fail("Window count to high: " + windowCount);
+ }
+ }
+
+ if (seenAll) {
+ // exit
+ throw new SuccessException();
+ }
+
+ }
+ }
+
+ @Override
+ public HashMap<Long, Integer> snapshotState(long checkpointId, long checkpointTimestamp) {
+ return this.windowCounts;
+ }
+
+ @Override
+ public void restoreState(HashMap<Long, Integer> state) {
+ this.windowCounts.putAll(state);
+ }
+ }
+
+ // Sink for validating the stateful window counts
+ private static class CountValidatingSink extends RichSinkFunction<Tuple4<Long, Long, Long, IntType>>
+ implements Checkpointed<HashMap<Long, Integer>> {
+
+ private final HashMap<Long, Integer> windowCounts = new HashMap<>();
+
+ private final int numKeys;
+ private final int numWindowsExpected;
+
+ private CountValidatingSink(int numKeys, int numWindowsExpected) {
+ this.numKeys = numKeys;
+ this.numWindowsExpected = numWindowsExpected;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ // this sink can only work with DOP 1
+ assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
+ }
+
+ @Override
+ public void close() throws Exception {
+ boolean seenAll = true;
+ if (windowCounts.size() == numKeys) {
+ for (Integer windowCount: windowCounts.values()) {
+ if (windowCount < numWindowsExpected) {
+ seenAll = false;
+ break;
+ }
+ }
+ }
+ assertTrue("The source must see all expected windows.", seenAll);
+ }
+
+ @Override
+ public void invoke(Tuple4<Long, Long, Long, IntType> value) throws Exception {
+
+ Integer curr = windowCounts.get(value.f0);
+ if (curr != null) {
+ windowCounts.put(value.f0, curr + 1);
+ }
+ else {
+ windowCounts.put(value.f0, 1);
+ }
+
+
+ // verify the contents of that window, the contents should be:
+ // (key + num windows so far)
+
+ assertEquals("Window counts don't match for key " + value.f0 + ".", value.f0.intValue() + windowCounts.get(value.f0), value.f3.value);
+
+ boolean seenAll = true;
+ if (windowCounts.size() == numKeys) {
+ for (Integer windowCount: windowCounts.values()) {
+ if (windowCount < numWindowsExpected) {
+ seenAll = false;
+ break;
+ } else if (windowCount > numWindowsExpected) {
+ fail("Window count to high: " + windowCount);
+ }
+ }
+
+ if (seenAll) {
+ // exit
+ throw new SuccessException();
+ }
+
+ }
+ }
+
+ @Override
+ public HashMap<Long, Integer> snapshotState(long checkpointId, long checkpointTimestamp) {
+ return this.windowCounts;
+ }
+
+ @Override
+ public void restoreState(HashMap<Long, Integer> state) {
+ this.windowCounts.putAll(state);
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ public static class IntType {
+
+ public int value;
+
+ public IntType() {}
+
+ public IntType(int value) { this.value = value; }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e288617f/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
index 5d17608..b493e42 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
@@ -53,7 +53,7 @@ import static org.junit.Assert.*;
* This verfies that checkpointing works correctly with event time windows.
*
* <p>
- * This is a version of {@link EventTimeWindowCheckpointingITCase} for All-Windows.
+ * This is a version of {@link AbstractEventTimeWindowCheckpointingITCase} for All-Windows.
*/
@SuppressWarnings("serial")
public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
http://git-wip-us.apache.org/repos/asf/flink/blob/e288617f/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
deleted file mode 100644
index 50079d1..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
+++ /dev/null
@@ -1,800 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.checkpointing;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
-import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.CheckpointListener;
-import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
-import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.test.util.SuccessException;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.TestLogger;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static org.apache.flink.test.util.TestUtils.tryExecute;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * This verifies that checkpointing works correctly with event time windows. This is more
- * strict than {@link WindowCheckpointingITCase} because for event-time the contents
- * of the emitted windows are deterministic.
- */
-@SuppressWarnings("serial")
-@RunWith(Parameterized.class)
-public class EventTimeWindowCheckpointingITCase extends TestLogger {
-
- private static final int MAX_MEM_STATE_SIZE = 10 * 1024 * 1024;
- private static final int PARALLELISM = 4;
-
- private static LocalFlinkMiniCluster cluster;
-
- @Rule
- public TemporaryFolder tempFolder = new TemporaryFolder();
-
- private StateBackendEnum stateBackendEnum;
- private AbstractStateBackend stateBackend;
-
- public EventTimeWindowCheckpointingITCase(StateBackendEnum stateBackendEnum) {
- this.stateBackendEnum = stateBackendEnum;
- }
-
- @BeforeClass
- public static void startTestCluster() {
- Configuration config = new Configuration();
- config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM / 2);
- config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 48);
-
- cluster = new LocalFlinkMiniCluster(config, false);
- cluster.start();
- }
-
- @AfterClass
- public static void stopTestCluster() {
- if (cluster != null) {
- cluster.stop();
- }
- }
-
- @Before
- public void initStateBackend() throws IOException {
- switch (stateBackendEnum) {
- case MEM:
- this.stateBackend = new MemoryStateBackend(MAX_MEM_STATE_SIZE);
- break;
- case FILE: {
- String backups = tempFolder.newFolder().getAbsolutePath();
- this.stateBackend = new FsStateBackend("file://" + backups);
- break;
- }
- case ROCKSDB_FULLY_ASYNC: {
- String rocksDb = tempFolder.newFolder().getAbsolutePath();
- RocksDBStateBackend rdb = new RocksDBStateBackend(new MemoryStateBackend(MAX_MEM_STATE_SIZE));
- rdb.setDbStoragePath(rocksDb);
- this.stateBackend = rdb;
- break;
- }
-
- }
- }
-
- // ------------------------------------------------------------------------
-
- @Test
- public void testTumblingTimeWindow() {
- final int NUM_ELEMENTS_PER_KEY = 3000;
- final int WINDOW_SIZE = 100;
- final int NUM_KEYS = 100;
- FailingSource.reset();
-
- try {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
- "localhost", cluster.getLeaderRPCPort());
-
- env.setParallelism(PARALLELISM);
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
- env.enableCheckpointing(100);
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
- env.getConfig().disableSysoutLogging();
- env.setStateBackend(this.stateBackend);
-
- env
- .addSource(new FailingSource(NUM_KEYS, NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3))
- .rebalance()
- .keyBy(0)
- .timeWindow(Time.of(WINDOW_SIZE, MILLISECONDS))
- .apply(new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() {
-
- private boolean open = false;
-
- @Override
- public void open(Configuration parameters) {
- assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks());
- open = true;
- }
-
- @Override
- public void apply(
- Tuple tuple,
- TimeWindow window,
- Iterable<Tuple2<Long, IntType>> values,
- Collector<Tuple4<Long, Long, Long, IntType>> out) {
-
- // validate that the function has been opened properly
- assertTrue(open);
-
- int sum = 0;
- long key = -1;
-
- for (Tuple2<Long, IntType> value : values) {
- sum += value.f1.value;
- key = value.f0;
- }
- out.collect(new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum)));
- }
- })
- .addSink(new ValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SIZE)).setParallelism(1);
-
-
- tryExecute(env, "Tumbling Window Test");
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testTumblingTimeWindowWithKVStateMinMaxParallelism() {
- doTestTumblingTimeWindowWithKVState(PARALLELISM);
- }
-
- @Test
- public void testTumblingTimeWindowWithKVStateMaxMaxParallelism() {
- doTestTumblingTimeWindowWithKVState(1 << 15);
- }
-
- public void doTestTumblingTimeWindowWithKVState(int maxParallelism) {
- final int NUM_ELEMENTS_PER_KEY = 3000;
- final int WINDOW_SIZE = 100;
- final int NUM_KEYS = 100;
- FailingSource.reset();
-
- try {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
- "localhost", cluster.getLeaderRPCPort());
-
- env.setParallelism(PARALLELISM);
- env.setMaxParallelism(maxParallelism);
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
- env.enableCheckpointing(100);
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
- env.getConfig().disableSysoutLogging();
- env.setStateBackend(this.stateBackend);
-
- env
- .addSource(new FailingSource(NUM_KEYS, NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3))
- .rebalance()
- .keyBy(0)
- .timeWindow(Time.of(WINDOW_SIZE, MILLISECONDS))
- .apply(new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() {
-
- private boolean open = false;
-
- private ValueState<Integer> count;
-
- @Override
- public void open(Configuration parameters) {
- assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks());
- open = true;
- count = getRuntimeContext().getState(
- new ValueStateDescriptor<>("count", Integer.class, 0));
- }
-
- @Override
- public void apply(
- Tuple tuple,
- TimeWindow window,
- Iterable<Tuple2<Long, IntType>> values,
- Collector<Tuple4<Long, Long, Long, IntType>> out) throws Exception {
-
- // the window count state starts with the key, so that we get
- // different count results for each key
- if (count.value() == 0) {
- count.update(tuple.<Long>getField(0).intValue());
- }
-
- // validate that the function has been opened properly
- assertTrue(open);
-
- count.update(count.value() + 1);
- out.collect(new Tuple4<>(tuple.<Long>getField(0), window.getStart(), window.getEnd(), new IntType(count.value())));
- }
- })
- .addSink(new CountValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SIZE)).setParallelism(1);
-
-
- tryExecute(env, "Tumbling Window Test");
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testSlidingTimeWindow() {
- final int NUM_ELEMENTS_PER_KEY = 3000;
- final int WINDOW_SIZE = 1000;
- final int WINDOW_SLIDE = 100;
- final int NUM_KEYS = 100;
- FailingSource.reset();
-
- try {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
- "localhost", cluster.getLeaderRPCPort());
-
- env.setMaxParallelism(2 * PARALLELISM);
- env.setParallelism(PARALLELISM);
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
- env.enableCheckpointing(100);
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
- env.getConfig().disableSysoutLogging();
- env.setStateBackend(this.stateBackend);
-
- env
- .addSource(new FailingSource(NUM_KEYS, NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3))
- .rebalance()
- .keyBy(0)
- .timeWindow(Time.of(WINDOW_SIZE, MILLISECONDS), Time.of(WINDOW_SLIDE, MILLISECONDS))
- .apply(new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() {
-
- private boolean open = false;
-
- @Override
- public void open(Configuration parameters) {
- assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks());
- open = true;
- }
-
- @Override
- public void apply(
- Tuple tuple,
- TimeWindow window,
- Iterable<Tuple2<Long, IntType>> values,
- Collector<Tuple4<Long, Long, Long, IntType>> out) {
-
- // validate that the function has been opened properly
- assertTrue(open);
-
- int sum = 0;
- long key = -1;
-
- for (Tuple2<Long, IntType> value : values) {
- sum += value.f1.value;
- key = value.f0;
- }
- out.collect(new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum)));
- }
- })
- .addSink(new ValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SLIDE)).setParallelism(1);
-
-
- tryExecute(env, "Tumbling Window Test");
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testPreAggregatedTumblingTimeWindow() {
- final int NUM_ELEMENTS_PER_KEY = 3000;
- final int WINDOW_SIZE = 100;
- final int NUM_KEYS = 100;
- FailingSource.reset();
-
- try {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
- "localhost", cluster.getLeaderRPCPort());
-
- env.setParallelism(PARALLELISM);
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
- env.enableCheckpointing(100);
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
- env.getConfig().disableSysoutLogging();
- env.setStateBackend(this.stateBackend);
-
- env
- .addSource(new FailingSource(NUM_KEYS, NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3))
- .rebalance()
- .keyBy(0)
- .timeWindow(Time.of(WINDOW_SIZE, MILLISECONDS))
- .reduce(
- new ReduceFunction<Tuple2<Long, IntType>>() {
-
- @Override
- public Tuple2<Long, IntType> reduce(
- Tuple2<Long, IntType> a,
- Tuple2<Long, IntType> b) {
- return new Tuple2<>(a.f0, new IntType(a.f1.value + b.f1.value));
- }
- },
- new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() {
-
- private boolean open = false;
-
- @Override
- public void open(Configuration parameters) {
- assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks());
- open = true;
- }
-
- @Override
- public void apply(
- Tuple tuple,
- TimeWindow window,
- Iterable<Tuple2<Long, IntType>> input,
- Collector<Tuple4<Long, Long, Long, IntType>> out) {
-
- // validate that the function has been opened properly
- assertTrue(open);
-
- for (Tuple2<Long, IntType> in: input) {
- out.collect(new Tuple4<>(in.f0,
- window.getStart(),
- window.getEnd(),
- in.f1));
- }
- }
- })
- .addSink(new ValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SIZE)).setParallelism(1);
-
-
- tryExecute(env, "Tumbling Window Test");
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testPreAggregatedSlidingTimeWindow() {
- final int NUM_ELEMENTS_PER_KEY = 3000;
- final int WINDOW_SIZE = 1000;
- final int WINDOW_SLIDE = 100;
- final int NUM_KEYS = 100;
- FailingSource.reset();
-
- try {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
- "localhost", cluster.getLeaderRPCPort());
-
- env.setParallelism(PARALLELISM);
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
- env.enableCheckpointing(100);
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
- env.getConfig().disableSysoutLogging();
- env.setStateBackend(this.stateBackend);
-
- env
- .addSource(new FailingSource(NUM_KEYS, NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3))
- .rebalance()
- .keyBy(0)
- .timeWindow(Time.of(WINDOW_SIZE, MILLISECONDS), Time.of(WINDOW_SLIDE, MILLISECONDS))
- .reduce(
- new ReduceFunction<Tuple2<Long, IntType>>() {
-
- @Override
- public Tuple2<Long, IntType> reduce(
- Tuple2<Long, IntType> a,
- Tuple2<Long, IntType> b) {
-
- // validate that the function has been opened properly
- return new Tuple2<>(a.f0, new IntType(a.f1.value + b.f1.value));
- }
- },
- new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() {
-
- private boolean open = false;
-
- @Override
- public void open(Configuration parameters) {
- assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks());
- open = true;
- }
-
- @Override
- public void apply(
- Tuple tuple,
- TimeWindow window,
- Iterable<Tuple2<Long, IntType>> input,
- Collector<Tuple4<Long, Long, Long, IntType>> out) {
-
- // validate that the function has been opened properly
- assertTrue(open);
-
- for (Tuple2<Long, IntType> in: input) {
- out.collect(new Tuple4<>(in.f0,
- window.getStart(),
- window.getEnd(),
- in.f1));
- }
- }
- })
- .addSink(new ValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SLIDE)).setParallelism(1);
-
-
- tryExecute(env, "Tumbling Window Test");
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
-
- // ------------------------------------------------------------------------
- // Utilities
- // ------------------------------------------------------------------------
-
- private static class FailingSource extends RichSourceFunction<Tuple2<Long, IntType>>
- implements Checkpointed<Integer>, CheckpointListener
- {
- private static volatile boolean failedBefore = false;
-
- private final int numKeys;
- private final int numElementsToEmit;
- private final int failureAfterNumElements;
-
- private volatile int numElementsEmitted;
- private volatile int numSuccessfulCheckpoints;
- private volatile boolean running = true;
-
- private FailingSource(int numKeys, int numElementsToEmitPerKey, int failureAfterNumElements) {
- this.numKeys = numKeys;
- this.numElementsToEmit = numElementsToEmitPerKey;
- this.failureAfterNumElements = failureAfterNumElements;
- }
-
- @Override
- public void open(Configuration parameters) {
- // non-parallel source
- assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
- }
-
- @Override
- public void run(SourceContext<Tuple2<Long, IntType>> ctx) throws Exception {
- // we loop longer than we have elements, to permit delayed checkpoints
- // to still cause a failure
- while (running) {
-
- if (!failedBefore) {
- // delay a bit, if we have not failed before
- Thread.sleep(1);
- if (numSuccessfulCheckpoints >= 2 && numElementsEmitted >= failureAfterNumElements) {
- // cause a failure if we have not failed before and have reached
- // enough completed checkpoints and elements
- failedBefore = true;
- throw new Exception("Artificial Failure");
- }
- }
-
- if (numElementsEmitted < numElementsToEmit &&
- (failedBefore || numElementsEmitted <= failureAfterNumElements))
- {
- // the function failed before, or we are in the elements before the failure
- synchronized (ctx.getCheckpointLock()) {
- int next = numElementsEmitted++;
- for (long i = 0; i < numKeys; i++) {
- ctx.collectWithTimestamp(new Tuple2<Long, IntType>(i, new IntType(next)), next);
- }
- ctx.emitWatermark(new Watermark(next));
- }
- }
- else {
-
- // if our work is done, delay a bit to prevent busy waiting
- Thread.sleep(1);
- }
- }
- }
-
- @Override
- public void cancel() {
- running = false;
- }
-
- @Override
- public void notifyCheckpointComplete(long checkpointId) {
- numSuccessfulCheckpoints++;
- }
-
- @Override
- public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
- return numElementsEmitted;
- }
-
- @Override
- public void restoreState(Integer state) {
- numElementsEmitted = state;
- }
-
- public static void reset() {
- failedBefore = false;
- }
- }
-
- private static class ValidatingSink extends RichSinkFunction<Tuple4<Long, Long, Long, IntType>>
- implements Checkpointed<HashMap<Long, Integer>> {
-
- private final HashMap<Long, Integer> windowCounts = new HashMap<>();
-
- private final int numKeys;
- private final int numWindowsExpected;
-
- private ValidatingSink(int numKeys, int numWindowsExpected) {
- this.numKeys = numKeys;
- this.numWindowsExpected = numWindowsExpected;
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- // this sink can only work with DOP 1
- assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
-
- // it can happen that a checkpoint happens when the complete success state is
- // already set. In that case we restart with the final state and would never
- // finish because no more elements arrive.
- if (windowCounts.size() == numKeys) {
- boolean seenAll = true;
- for (Integer windowCount: windowCounts.values()) {
- if (windowCount != numWindowsExpected) {
- seenAll = false;
- break;
- }
- }
- if (seenAll) {
- throw new SuccessException();
- }
- }
- }
-
- @Override
- public void close() throws Exception {
- boolean seenAll = true;
- if (windowCounts.size() == numKeys) {
- for (Integer windowCount: windowCounts.values()) {
- if (windowCount < numWindowsExpected) {
- seenAll = false;
- break;
- }
- }
- }
- assertTrue("The sink must see all expected windows.", seenAll);
- }
-
- @Override
- public void invoke(Tuple4<Long, Long, Long, IntType> value) throws Exception {
-
- // verify the contents of that window, Tuple4.f1 and .f2 are the window start/end
- // the sum should be "sum (start .. end-1)"
-
- int expectedSum = 0;
- for (long i = value.f1; i < value.f2; i++) {
- // only sum up positive vals, to filter out the negative start of the
- // first sliding windows
- if (i > 0) {
- expectedSum += i;
- }
- }
-
- assertEquals("Window start: " + value.f1 + " end: " + value.f2, expectedSum, value.f3.value);
-
-
- Integer curr = windowCounts.get(value.f0);
- if (curr != null) {
- windowCounts.put(value.f0, curr + 1);
- }
- else {
- windowCounts.put(value.f0, 1);
- }
-
- if (windowCounts.size() == numKeys) {
- boolean seenAll = true;
- for (Integer windowCount: windowCounts.values()) {
- if (windowCount < numWindowsExpected) {
- seenAll = false;
- break;
- } else if (windowCount > numWindowsExpected) {
- fail("Window count to high: " + windowCount);
- }
- }
-
- if (seenAll) {
- // exit
- throw new SuccessException();
- }
-
- }
- }
-
- @Override
- public HashMap<Long, Integer> snapshotState(long checkpointId, long checkpointTimestamp) {
- return this.windowCounts;
- }
-
- @Override
- public void restoreState(HashMap<Long, Integer> state) {
- this.windowCounts.putAll(state);
- }
- }
-
- // Sink for validating the stateful window counts
- private static class CountValidatingSink extends RichSinkFunction<Tuple4<Long, Long, Long, IntType>>
- implements Checkpointed<HashMap<Long, Integer>> {
-
- private final HashMap<Long, Integer> windowCounts = new HashMap<>();
-
- private final int numKeys;
- private final int numWindowsExpected;
-
- private CountValidatingSink(int numKeys, int numWindowsExpected) {
- this.numKeys = numKeys;
- this.numWindowsExpected = numWindowsExpected;
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- // this sink can only work with DOP 1
- assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
- }
-
- @Override
- public void close() throws Exception {
- boolean seenAll = true;
- if (windowCounts.size() == numKeys) {
- for (Integer windowCount: windowCounts.values()) {
- if (windowCount < numWindowsExpected) {
- seenAll = false;
- break;
- }
- }
- }
- assertTrue("The source must see all expected windows.", seenAll);
- }
-
- @Override
- public void invoke(Tuple4<Long, Long, Long, IntType> value) throws Exception {
-
- Integer curr = windowCounts.get(value.f0);
- if (curr != null) {
- windowCounts.put(value.f0, curr + 1);
- }
- else {
- windowCounts.put(value.f0, 1);
- }
-
-
- // verify the contents of that window, the contents should be:
- // (key + num windows so far)
-
- assertEquals("Window counts don't match for key " + value.f0 + ".", value.f0.intValue() + windowCounts.get(value.f0), value.f3.value);
-
- boolean seenAll = true;
- if (windowCounts.size() == numKeys) {
- for (Integer windowCount: windowCounts.values()) {
- if (windowCount < numWindowsExpected) {
- seenAll = false;
- break;
- } else if (windowCount > numWindowsExpected) {
- fail("Window count to high: " + windowCount);
- }
- }
-
- if (seenAll) {
- // exit
- throw new SuccessException();
- }
-
- }
- }
-
- @Override
- public HashMap<Long, Integer> snapshotState(long checkpointId, long checkpointTimestamp) {
- return this.windowCounts;
- }
-
- @Override
- public void restoreState(HashMap<Long, Integer> state) {
- this.windowCounts.putAll(state);
- }
- }
-
- // ------------------------------------------------------------------------
- // Parametrization for testing with different state backends
- // ------------------------------------------------------------------------
-
-
- @Parameterized.Parameters(name = "StateBackend = {0}")
- @SuppressWarnings("unchecked,rawtypes")
- public static Collection<Object[]> parameters(){
- return Arrays.asList(new Object[][] {
- {StateBackendEnum.MEM},
- {StateBackendEnum.FILE},
- {StateBackendEnum.ROCKSDB_FULLY_ASYNC}
- }
- );
- }
-
- private enum StateBackendEnum {
- MEM, FILE, ROCKSDB_FULLY_ASYNC
- }
-
-
- // ------------------------------------------------------------------------
- // Utilities
- // ------------------------------------------------------------------------
-
- public static class IntType {
-
- public int value;
-
- public IntType() {}
-
- public IntType(int value) { this.value = value; }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/e288617f/flink-tests/src/test/java/org/apache/flink/test/checkpointing/FileBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/FileBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/FileBackendEventTimeWindowCheckpointingITCase.java
new file mode 100644
index 0000000..65fda09
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/FileBackendEventTimeWindowCheckpointingITCase.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+public class FileBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase {
+
+ public FileBackendEventTimeWindowCheckpointingITCase() {
+ super(StateBackendEnum.FILE);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e288617f/flink-tests/src/test/java/org/apache/flink/test/checkpointing/MemBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/MemBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/MemBackendEventTimeWindowCheckpointingITCase.java
new file mode 100644
index 0000000..899b8d6
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/MemBackendEventTimeWindowCheckpointingITCase.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+public class MemBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase {
+
+ public MemBackendEventTimeWindowCheckpointingITCase() {
+ super(StateBackendEnum.MEM);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e288617f/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java
new file mode 100644
index 0000000..14feb78
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+public class RocksDbBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase {
+
+ public RocksDbBackendEventTimeWindowCheckpointingITCase() {
+ super(StateBackendEnum.ROCKSDB_FULLY_ASYNC);
+ }
+}