You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/20 18:42:21 UTC
[2/4] flink git commit: [FLINK-2864] Make State of General-Purpose
Window Operators Fault-Tolerant
http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
new file mode 100644
index 0000000..2733349
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
@@ -0,0 +1,603 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichEventTimeSourceFunction;
+import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.TestLogger;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.HashMap;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.junit.Assert.*;
+
+/**
+ * This verfies that checkpointing works correctly with event time windows.
+ *
+ * <p>
+ * This is a version of {@link EventTimeWindowCheckpointingITCase} for All-Windows.
+ */
+@SuppressWarnings("serial")
+public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
+
+ private static final int PARALLELISM = 4;
+
+ private static ForkableFlinkMiniCluster cluster;
+
+
+ @BeforeClass
+ public static void startTestCluster() {
+ Configuration config = new Configuration();
+ config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
+ config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM / 2);
+ config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 48);
+ config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "0 ms");
+
+ cluster = new ForkableFlinkMiniCluster(config, false, StreamingMode.STREAMING);
+ cluster.start();
+ }
+
+ @AfterClass
+ public static void stopTestCluster() {
+ if (cluster != null) {
+ cluster.stop();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Test
+ public void testTumblingTimeWindow() {
+ final int NUM_ELEMENTS_PER_KEY = 3000;
+ final int WINDOW_SIZE = 100;
+ final int NUM_KEYS = 1;
+ FailingSource.reset();
+
+ try {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
+ "localhost", cluster.getLeaderRPCPort());
+
+ env.setParallelism(PARALLELISM);
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+ env.enableCheckpointing(100);
+ env.setNumberOfExecutionRetries(3);
+ env.getConfig().disableSysoutLogging();
+
+ env
+ .addSource(new FailingSource(NUM_KEYS,
+ NUM_ELEMENTS_PER_KEY,
+ NUM_ELEMENTS_PER_KEY / 3))
+ .rebalance()
+ .timeWindowAll(Time.of(WINDOW_SIZE, MILLISECONDS))
+ .apply(new RichAllWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, TimeWindow>() {
+
+ private boolean open = false;
+
+ @Override
+ public void open(Configuration parameters) {
+ assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
+ open = true;
+ }
+
+ @Override
+ public void apply(
+ TimeWindow window,
+ Iterable<Tuple2<Long, IntType>> values,
+ Collector<Tuple4<Long, Long, Long, IntType>> out) {
+
+ // validate that the function has been opened properly
+ assertTrue(open);
+
+ int sum = 0;
+ long key = -1;
+
+ for (Tuple2<Long, IntType> value : values) {
+ sum += value.f1.value;
+ key = value.f0;
+ }
+ out.collect(new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum)));
+ }
+ })
+ .addSink(new ValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SIZE)).setParallelism(1);
+
+
+ tryExecute(env, "Tumbling Window Test");
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testSlidingTimeWindow() {
+ final int NUM_ELEMENTS_PER_KEY = 3000;
+ final int WINDOW_SIZE = 1000;
+ final int WINDOW_SLIDE = 100;
+ final int NUM_KEYS = 1;
+ FailingSource.reset();
+
+ try {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
+ "localhost", cluster.getLeaderRPCPort());
+
+ env.setParallelism(PARALLELISM);
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+ env.enableCheckpointing(100);
+ env.setNumberOfExecutionRetries(3);
+ env.getConfig().disableSysoutLogging();
+
+ env
+ .addSource(new FailingSource(NUM_KEYS, NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3))
+ .rebalance()
+ .timeWindowAll(Time.of(WINDOW_SIZE, MILLISECONDS), Time.of(WINDOW_SLIDE, MILLISECONDS))
+ .apply(new RichAllWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, TimeWindow>() {
+
+ private boolean open = false;
+
+ @Override
+ public void open(Configuration parameters) {
+ assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
+ open = true;
+ }
+
+ @Override
+ public void apply(
+ TimeWindow window,
+ Iterable<Tuple2<Long, IntType>> values,
+ Collector<Tuple4<Long, Long, Long, IntType>> out) {
+
+ // validate that the function has been opened properly
+ assertTrue(open);
+
+ int sum = 0;
+ long key = -1;
+
+ for (Tuple2<Long, IntType> value : values) {
+ sum += value.f1.value;
+ key = value.f0;
+ }
+ out.collect(new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum)));
+ }
+ })
+ .addSink(new ValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SLIDE)).setParallelism(1);
+
+
+ tryExecute(env, "Tumbling Window Test");
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testPreAggregatedTumblingTimeWindow() {
+ final int NUM_ELEMENTS_PER_KEY = 3000;
+ final int WINDOW_SIZE = 100;
+ final int NUM_KEYS = 1;
+ FailingSource.reset();
+
+ try {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
+ "localhost", cluster.getLeaderRPCPort());
+
+ env.setParallelism(PARALLELISM);
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+ env.enableCheckpointing(100);
+ env.setNumberOfExecutionRetries(3);
+ env.getConfig().disableSysoutLogging();
+
+ env
+ .addSource(new FailingSource(NUM_KEYS,
+ NUM_ELEMENTS_PER_KEY,
+ NUM_ELEMENTS_PER_KEY / 3))
+ .rebalance()
+ .timeWindowAll(Time.of(WINDOW_SIZE, MILLISECONDS))
+ .apply(
+ new RichReduceFunction<Tuple2<Long, IntType>>() {
+
+ private boolean open = false;
+
+ @Override
+ public void open(Configuration parameters) {
+ assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
+ open = true;
+ }
+
+ @Override
+ public Tuple2<Long, IntType> reduce(
+ Tuple2<Long, IntType> a,
+ Tuple2<Long, IntType> b) {
+
+ // validate that the function has been opened properly
+ assertTrue(open);
+ return new Tuple2<>(a.f0, new IntType(a.f1.value + b.f1.value));
+ }
+ },
+ new RichAllWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, TimeWindow>() {
+
+ private boolean open = false;
+
+ @Override
+ public void open(Configuration parameters) {
+ assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
+ open = true;
+ }
+
+ @Override
+ public void apply(
+ TimeWindow window,
+ Iterable<Tuple2<Long, IntType>> values,
+ Collector<Tuple4<Long, Long, Long, IntType>> out) {
+
+ // validate that the function has been opened properly
+ assertTrue(open);
+
+ int sum = 0;
+ long key = -1;
+
+ for (Tuple2<Long, IntType> value : values) {
+ sum += value.f1.value;
+ key = value.f0;
+ }
+ out.collect(new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum)));
+ }
+ })
+ .addSink(new ValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SIZE)).setParallelism(1);
+
+
+ tryExecute(env, "Tumbling Window Test");
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testPreAggregatedSlidingTimeWindow() {
+ final int NUM_ELEMENTS_PER_KEY = 3000;
+ final int WINDOW_SIZE = 1000;
+ final int WINDOW_SLIDE = 100;
+ final int NUM_KEYS = 1;
+ FailingSource.reset();
+
+ try {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
+ "localhost", cluster.getLeaderRPCPort());
+
+ env.setParallelism(PARALLELISM);
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+ env.enableCheckpointing(100);
+ env.setNumberOfExecutionRetries(3);
+ env.getConfig().disableSysoutLogging();
+
+ env
+ .addSource(new FailingSource(NUM_KEYS,
+ NUM_ELEMENTS_PER_KEY,
+ NUM_ELEMENTS_PER_KEY / 3))
+ .rebalance()
+ .timeWindowAll(Time.of(WINDOW_SIZE, MILLISECONDS),
+ Time.of(WINDOW_SLIDE, MILLISECONDS))
+ .apply(
+ new RichReduceFunction<Tuple2<Long, IntType>>() {
+
+ private boolean open = false;
+
+ @Override
+ public void open(Configuration parameters) {
+ assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
+ open = true;
+ }
+
+ @Override
+ public Tuple2<Long, IntType> reduce(
+ Tuple2<Long, IntType> a,
+ Tuple2<Long, IntType> b) {
+
+ // validate that the function has been opened properly
+ assertTrue(open);
+ return new Tuple2<>(a.f0, new IntType(a.f1.value + b.f1.value));
+ }
+ },
+ new RichAllWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, TimeWindow>() {
+
+ private boolean open = false;
+
+ @Override
+ public void open(Configuration parameters) {
+ assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
+ open = true;
+ }
+
+ @Override
+ public void apply(
+ TimeWindow window,
+ Iterable<Tuple2<Long, IntType>> values,
+ Collector<Tuple4<Long, Long, Long, IntType>> out) {
+
+ // validate that the function has been opened properly
+ assertTrue(open);
+
+ int sum = 0;
+ long key = -1;
+
+ for (Tuple2<Long, IntType> value : values) {
+ sum += value.f1.value;
+ key = value.f0;
+ }
+ out.collect(new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum)));
+ }
+ })
+ .addSink(new ValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SLIDE)).setParallelism(1);
+
+
+ tryExecute(env, "Tumbling Window Test");
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ private static class FailingSource extends RichEventTimeSourceFunction<Tuple2<Long, IntType>>
+ implements Checkpointed<Integer>, CheckpointNotifier
+ {
+ private static volatile boolean failedBefore = false;
+
+ private final int numKeys;
+ private final int numElementsToEmit;
+ private final int failureAfterNumElements;
+
+ private volatile int numElementsEmitted;
+ private volatile int numSuccessfulCheckpoints;
+ private volatile boolean running = true;
+
+ private FailingSource(int numKeys, int numElementsToEmitPerKey, int failureAfterNumElements) {
+ this.numKeys = numKeys;
+ this.numElementsToEmit = numElementsToEmitPerKey;
+ this.failureAfterNumElements = failureAfterNumElements;
+ }
+
+ @Override
+ public void open(Configuration parameters) {
+ // non-parallel source
+ assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
+ }
+
+ @Override
+ public void run(SourceContext<Tuple2<Long, IntType>> ctx) throws Exception {
+ // we loop longer than we have elements, to permit delayed checkpoints
+ // to still cause a failure
+ while (running) {
+
+ if (!failedBefore) {
+ // delay a bit, if we have not failed before
+ Thread.sleep(1);
+ if (numSuccessfulCheckpoints >= 2 && numElementsEmitted >= failureAfterNumElements) {
+ // cause a failure if we have not failed before and have reached
+ // enough completed checkpoints and elements
+ failedBefore = true;
+ throw new Exception("Artificial Failure");
+ }
+ }
+
+ if (numElementsEmitted < numElementsToEmit &&
+ (failedBefore || numElementsEmitted <= failureAfterNumElements))
+ {
+ // the function failed before, or we are in the elements before the failure
+ synchronized (ctx.getCheckpointLock()) {
+ int next = numElementsEmitted++;
+ for (long i = 0; i < numKeys; i++) {
+ ctx.collectWithTimestamp(new Tuple2<Long, IntType>(i, new IntType(next)), next);
+ }
+ ctx.emitWatermark(new Watermark(next));
+ }
+ }
+ else {
+ // exit at some point so that we don't deadlock
+ if (numElementsEmitted > numElementsToEmit * 5) {
+// running = false;
+ System.err.println("Succ Checkpoints: " + numSuccessfulCheckpoints + " numElemEmitted: " + numElementsEmitted + "num elements to emit: " + numElementsToEmit);
+ }
+ // if our work is done, delay a bit to prevent busy waiting
+ Thread.sleep(1);
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) {
+ numSuccessfulCheckpoints++;
+ }
+
+ @Override
+ public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
+ return numElementsEmitted;
+ }
+
+ @Override
+ public void restoreState(Integer state) {
+ numElementsEmitted = state;
+ }
+
+ public static void reset() {
+ failedBefore = false;
+ }
+ }
+
+ private static class ValidatingSink extends RichSinkFunction<Tuple4<Long, Long, Long, IntType>>
+ implements Checkpointed<HashMap<Long, Integer>> {
+
+ private final HashMap<Long, Integer> windowCounts = new HashMap<>();
+
+ private final int numKeys;
+ private final int numWindowsExpected;
+
+ private ValidatingSink(int numKeys, int numWindowsExpected) {
+ this.numKeys = numKeys;
+ this.numWindowsExpected = numWindowsExpected;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ // this sink can only work with DOP 1
+ assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
+ }
+
+ @Override
+ public void close() throws Exception {
+ boolean seenAll = true;
+ if (windowCounts.size() == numKeys) {
+ for (Integer windowCount: windowCounts.values()) {
+ if (windowCount < numWindowsExpected) {
+ seenAll = false;
+ break;
+ }
+ }
+ }
+ assertTrue("The source must see all expected windows.", seenAll);
+ }
+
+ @Override
+ public void invoke(Tuple4<Long, Long, Long, IntType> value) throws Exception {
+
+ // verify the contents of that window, Tuple4.f1 and .f2 are the window start/end
+ // the sum should be "sum (start .. end-1)"
+
+ int expectedSum = 0;
+ for (long i = value.f1; i < value.f2; i++) {
+ // only sum up positive vals, to filter out the negative start of the
+ // first sliding windows
+ if (i > 0) {
+ expectedSum += i;
+ }
+ }
+
+ assertEquals("Window start: " + value.f1 + " end: " + value.f2, expectedSum, value.f3.value);
+
+
+ Integer curr = windowCounts.get(value.f0);
+ if (curr != null) {
+ windowCounts.put(value.f0, curr + 1);
+ }
+ else {
+ windowCounts.put(value.f0, 1);
+ }
+
+ boolean seenAll = true;
+ if (windowCounts.size() == numKeys) {
+ for (Integer windowCount: windowCounts.values()) {
+ if (windowCount < numWindowsExpected) {
+ seenAll = false;
+ break;
+ } else if (windowCount > numWindowsExpected) {
+ fail("Window count to high: " + windowCount);
+ }
+ }
+
+ if (seenAll) {
+ // exit
+ throw new SuccessException();
+ }
+
+ }
+ }
+
+ @Override
+ public HashMap<Long, Integer> snapshotState(long checkpointId, long checkpointTimestamp) {
+ return this.windowCounts;
+ }
+
+ @Override
+ public void restoreState(HashMap<Long, Integer> state) {
+ this.windowCounts.putAll(state);
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ public static void tryExecute(StreamExecutionEnvironment env, String jobName) throws Exception {
+ try {
+ env.execute(jobName);
+ }
+ catch (ProgramInvocationException | JobExecutionException root) {
+ Throwable cause = root.getCause();
+
+ // search for nested SuccessExceptions
+ int depth = 0;
+ while (!(cause instanceof SuccessException)) {
+ if (cause == null || depth++ == 20) {
+ root.printStackTrace();
+ fail("Test failed: " + root.getMessage());
+ }
+ else {
+ cause = cause.getCause();
+ }
+ }
+ }
+ }
+
+ public static class IntType {
+
+ public int value;
+
+ public IntType() {}
+
+ public IntType(int value) { this.value = value; }
+ }
+
+ static final class SuccessException extends Exception {
+ private static final long serialVersionUID = -9218191172606739598L;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
new file mode 100644
index 0000000..4d1d2c3
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
@@ -0,0 +1,605 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichEventTimeSourceFunction;
+import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.TestLogger;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.HashMap;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.junit.Assert.*;
+
+/**
+ * This verfies that checkpointing works correctly with event time windows. This is more
+ * strict than {@link WindowCheckpointingITCase} because for event-time the contents
+ * of the emitted windows are deterministic.
+ */
+@SuppressWarnings("serial")
+public class EventTimeWindowCheckpointingITCase extends TestLogger {
+
+ private static final int PARALLELISM = 4;
+
+ private static ForkableFlinkMiniCluster cluster;
+
+
+ @BeforeClass
+ public static void startTestCluster() {
+ Configuration config = new Configuration();
+ config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
+ config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM / 2);
+ config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 48);
+ config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "0 ms");
+
+ cluster = new ForkableFlinkMiniCluster(config, false, StreamingMode.STREAMING);
+ cluster.start();
+ }
+
+ @AfterClass
+ public static void stopTestCluster() {
+ if (cluster != null) {
+ cluster.stop();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Test
+ public void testTumblingTimeWindow() {
+ final int NUM_ELEMENTS_PER_KEY = 3000;
+ final int WINDOW_SIZE = 100;
+ final int NUM_KEYS = 100;
+ FailingSource.reset();
+
+ try {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
+ "localhost", cluster.getLeaderRPCPort());
+
+ env.setParallelism(PARALLELISM);
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+ env.enableCheckpointing(100);
+ env.setNumberOfExecutionRetries(3);
+ env.getConfig().disableSysoutLogging();
+
+ env
+ .addSource(new FailingSource(NUM_KEYS, NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3))
+ .rebalance()
+ .keyBy(0)
+ .timeWindow(Time.of(WINDOW_SIZE, MILLISECONDS))
+ .apply(new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() {
+
+ private boolean open = false;
+
+ @Override
+ public void open(Configuration parameters) {
+ assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks());
+ open = true;
+ }
+
+ @Override
+ public void apply(
+ Tuple tuple,
+ TimeWindow window,
+ Iterable<Tuple2<Long, IntType>> values,
+ Collector<Tuple4<Long, Long, Long, IntType>> out) {
+
+ // validate that the function has been opened properly
+ assertTrue(open);
+
+ int sum = 0;
+ long key = -1;
+
+ for (Tuple2<Long, IntType> value : values) {
+ sum += value.f1.value;
+ key = value.f0;
+ }
+ out.collect(new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum)));
+ }
+ })
+ .addSink(new ValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SIZE)).setParallelism(1);
+
+
+ tryExecute(env, "Tumbling Window Test");
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testSlidingTimeWindow() {
+ final int NUM_ELEMENTS_PER_KEY = 3000;
+ final int WINDOW_SIZE = 1000;
+ final int WINDOW_SLIDE = 100;
+ final int NUM_KEYS = 100;
+ FailingSource.reset();
+
+ try {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
+ "localhost", cluster.getLeaderRPCPort());
+
+ env.setParallelism(PARALLELISM);
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+ env.enableCheckpointing(100);
+ env.setNumberOfExecutionRetries(3);
+ env.getConfig().disableSysoutLogging();
+
+ env
+ .addSource(new FailingSource(NUM_KEYS, NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3))
+ .rebalance()
+ .keyBy(0)
+ .timeWindow(Time.of(WINDOW_SIZE, MILLISECONDS), Time.of(WINDOW_SLIDE, MILLISECONDS))
+ .apply(new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() {
+
+ private boolean open = false;
+
+ @Override
+ public void open(Configuration parameters) {
+ assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks());
+ open = true;
+ }
+
+ @Override
+ public void apply(
+ Tuple tuple,
+ TimeWindow window,
+ Iterable<Tuple2<Long, IntType>> values,
+ Collector<Tuple4<Long, Long, Long, IntType>> out) {
+
+ // validate that the function has been opened properly
+ assertTrue(open);
+
+ int sum = 0;
+ long key = -1;
+
+ for (Tuple2<Long, IntType> value : values) {
+ sum += value.f1.value;
+ key = value.f0;
+ }
+ out.collect(new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum)));
+ }
+ })
+ .addSink(new ValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SLIDE)).setParallelism(1);
+
+
+ tryExecute(env, "Tumbling Window Test");
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testPreAggregatedTumblingTimeWindow() {
+ final int NUM_ELEMENTS_PER_KEY = 3000;
+ final int WINDOW_SIZE = 100;
+ final int NUM_KEYS = 100;
+ FailingSource.reset();
+
+ try {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
+ "localhost", cluster.getLeaderRPCPort());
+
+ env.setParallelism(PARALLELISM);
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+ env.enableCheckpointing(100);
+ env.setNumberOfExecutionRetries(3);
+ env.getConfig().disableSysoutLogging();
+
+ env
+ .addSource(new FailingSource(NUM_KEYS, NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3))
+ .rebalance()
+ .keyBy(0)
+ .timeWindow(Time.of(WINDOW_SIZE, MILLISECONDS))
+ .apply(
+ new RichReduceFunction<Tuple2<Long, IntType>>() {
+
+ private boolean open = false;
+
+ @Override
+ public void open(Configuration parameters) {
+ assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks());
+ open = true;
+ }
+
+ @Override
+ public Tuple2<Long, IntType> reduce(
+ Tuple2<Long, IntType> a,
+ Tuple2<Long, IntType> b) {
+
+ // validate that the function has been opened properly
+ assertTrue(open);
+ return new Tuple2<>(a.f0, new IntType(a.f1.value + b.f1.value));
+ }
+ },
+ new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() {
+
+ private boolean open = false;
+
+ @Override
+ public void open(Configuration parameters) {
+ assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks());
+ open = true;
+ }
+
+ @Override
+ public void apply(
+ Tuple tuple,
+ TimeWindow window,
+ Iterable<Tuple2<Long, IntType>> values,
+ Collector<Tuple4<Long, Long, Long, IntType>> out) {
+
+ // validate that the function has been opened properly
+ assertTrue(open);
+
+ int sum = 0;
+ long key = -1;
+
+ for (Tuple2<Long, IntType> value : values) {
+ sum += value.f1.value;
+ key = value.f0;
+ }
+ out.collect(new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum)));
+ }
+ })
+ .addSink(new ValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SIZE)).setParallelism(1);
+
+
+ tryExecute(env, "Tumbling Window Test");
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testPreAggregatedSlidingTimeWindow() {
+ final int NUM_ELEMENTS_PER_KEY = 3000;
+ final int WINDOW_SIZE = 1000;
+ final int WINDOW_SLIDE = 100;
+ final int NUM_KEYS = 100;
+ FailingSource.reset();
+
+ try {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
+ "localhost", cluster.getLeaderRPCPort());
+
+ env.setParallelism(PARALLELISM);
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+ env.enableCheckpointing(100);
+ env.setNumberOfExecutionRetries(3);
+ env.getConfig().disableSysoutLogging();
+
+ env
+ .addSource(new FailingSource(NUM_KEYS, NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3))
+ .rebalance()
+ .keyBy(0)
+ .timeWindow(Time.of(WINDOW_SIZE, MILLISECONDS), Time.of(WINDOW_SLIDE, MILLISECONDS))
+ .apply(
+ new RichReduceFunction<Tuple2<Long, IntType>>() {
+
+ private boolean open = false;
+
+ @Override
+ public void open(Configuration parameters) {
+ assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks());
+ open = true;
+ }
+
+ @Override
+ public Tuple2<Long, IntType> reduce(
+ Tuple2<Long, IntType> a,
+ Tuple2<Long, IntType> b) {
+
+ // validate that the function has been opened properly
+ assertTrue(open);
+ return new Tuple2<>(a.f0, new IntType(a.f1.value + b.f1.value));
+ }
+ },
+ new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() {
+
+ private boolean open = false;
+
+ @Override
+ public void open(Configuration parameters) {
+ assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks());
+ open = true;
+ }
+
+ @Override
+ public void apply(
+ Tuple tuple,
+ TimeWindow window,
+ Iterable<Tuple2<Long, IntType>> values,
+ Collector<Tuple4<Long, Long, Long, IntType>> out) {
+
+ // validate that the function has been opened properly
+ assertTrue(open);
+
+ int sum = 0;
+ long key = -1;
+
+ for (Tuple2<Long, IntType> value : values) {
+ sum += value.f1.value;
+ key = value.f0;
+ }
+ out.collect(new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum)));
+ }
+ })
+ .addSink(new ValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SLIDE)).setParallelism(1);
+
+
+ tryExecute(env, "Tumbling Window Test");
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ private static class FailingSource extends RichEventTimeSourceFunction<Tuple2<Long, IntType>>
+ implements Checkpointed<Integer>, CheckpointNotifier
+ {
+ private static volatile boolean failedBefore = false;
+
+ private final int numKeys;
+ private final int numElementsToEmit;
+ private final int failureAfterNumElements;
+
+ private volatile int numElementsEmitted;
+ private volatile int numSuccessfulCheckpoints;
+ private volatile boolean running = true;
+
+ private FailingSource(int numKeys, int numElementsToEmitPerKey, int failureAfterNumElements) {
+ this.numKeys = numKeys;
+ this.numElementsToEmit = numElementsToEmitPerKey;
+ this.failureAfterNumElements = failureAfterNumElements;
+ }
+
+ @Override
+ public void open(Configuration parameters) {
+ // non-parallel source
+ assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
+ }
+
+ @Override
+ public void run(SourceContext<Tuple2<Long, IntType>> ctx) throws Exception {
+ // we loop longer than we have elements, to permit delayed checkpoints
+ // to still cause a failure
+ while (running) {
+
+ if (!failedBefore) {
+ // delay a bit, if we have not failed before
+ Thread.sleep(1);
+ if (numSuccessfulCheckpoints >= 2 && numElementsEmitted >= failureAfterNumElements) {
+ // cause a failure if we have not failed before and have reached
+ // enough completed checkpoints and elements
+ failedBefore = true;
+ throw new Exception("Artificial Failure");
+ }
+ }
+
+ if (numElementsEmitted < numElementsToEmit &&
+ (failedBefore || numElementsEmitted <= failureAfterNumElements))
+ {
+ // the function failed before, or we are in the elements before the failure
+ synchronized (ctx.getCheckpointLock()) {
+ int next = numElementsEmitted++;
+ for (long i = 0; i < numKeys; i++) {
+ ctx.collectWithTimestamp(new Tuple2<Long, IntType>(i, new IntType(next)), next);
+ }
+ ctx.emitWatermark(new Watermark(next));
+ }
+ }
+ else {
+ // exit at some point so that we don't deadlock
+ if (numElementsEmitted > numElementsToEmit * 5) {
+// running = false;
+ System.err.println("Succ Checkpoints: " + numSuccessfulCheckpoints + " numElemEmitted: " + numElementsEmitted + "num elements to emit: " + numElementsToEmit);
+ }
+
+ // if our work is done, delay a bit to prevent busy waiting
+ Thread.sleep(1);
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) {
+ numSuccessfulCheckpoints++;
+ }
+
+ @Override
+ public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
+ return numElementsEmitted;
+ }
+
+ @Override
+ public void restoreState(Integer state) {
+ numElementsEmitted = state;
+ }
+
+ public static void reset() {
+ failedBefore = false;
+ }
+ }
+
+ private static class ValidatingSink extends RichSinkFunction<Tuple4<Long, Long, Long, IntType>>
+ implements Checkpointed<HashMap<Long, Integer>> {
+
+ private final HashMap<Long, Integer> windowCounts = new HashMap<>();
+
+ private final int numKeys;
+ private final int numWindowsExpected;
+
+ private ValidatingSink(int numKeys, int numWindowsExpected) {
+ this.numKeys = numKeys;
+ this.numWindowsExpected = numWindowsExpected;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ // this sink can only work with DOP 1
+ assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
+ }
+
+ @Override
+ public void close() throws Exception {
+ boolean seenAll = true;
+ if (windowCounts.size() == numKeys) {
+ for (Integer windowCount: windowCounts.values()) {
+ if (windowCount < numWindowsExpected) {
+ seenAll = false;
+ break;
+ }
+ }
+ }
+ assertTrue("The source must see all expected windows.", seenAll);
+ }
+
+ @Override
+ public void invoke(Tuple4<Long, Long, Long, IntType> value) throws Exception {
+
+ // verify the contents of that window, Tuple4.f1 and .f2 are the window start/end
+ // the sum should be "sum (start .. end-1)"
+
+ int expectedSum = 0;
+ for (long i = value.f1; i < value.f2; i++) {
+ // only sum up positive vals, to filter out the negative start of the
+ // first sliding windows
+ if (i > 0) {
+ expectedSum += i;
+ }
+ }
+
+ assertEquals("Window start: " + value.f1 + " end: " + value.f2, expectedSum, value.f3.value);
+
+
+ Integer curr = windowCounts.get(value.f0);
+ if (curr != null) {
+ windowCounts.put(value.f0, curr + 1);
+ }
+ else {
+ windowCounts.put(value.f0, 1);
+ }
+
+ boolean seenAll = true;
+ if (windowCounts.size() == numKeys) {
+ for (Integer windowCount: windowCounts.values()) {
+ if (windowCount < numWindowsExpected) {
+ seenAll = false;
+ break;
+ } else if (windowCount > numWindowsExpected) {
+ fail("Window count to high: " + windowCount);
+ }
+ }
+
+ if (seenAll) {
+ // exit
+ throw new SuccessException();
+ }
+
+ }
+ }
+
+ @Override
+ public HashMap<Long, Integer> snapshotState(long checkpointId, long checkpointTimestamp) {
+ return this.windowCounts;
+ }
+
+ @Override
+ public void restoreState(HashMap<Long, Integer> state) {
+ this.windowCounts.putAll(state);
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ public static void tryExecute(StreamExecutionEnvironment env, String jobName) throws Exception {
+ try {
+ env.execute(jobName);
+ }
+ catch (ProgramInvocationException | JobExecutionException root) {
+ Throwable cause = root.getCause();
+
+ // search for nested SuccessExceptions
+ int depth = 0;
+ while (!(cause instanceof SuccessException)) {
+ if (cause == null || depth++ == 20) {
+ root.printStackTrace();
+ fail("Test failed: " + root.getMessage());
+ }
+ else {
+ cause = cause.getCause();
+ }
+ }
+ }
+ }
+
+ public static class IntType {
+
+ public int value;
+
+ public IntType() {}
+
+ public IntType(int value) { this.value = value; }
+ }
+
+ static final class SuccessException extends Exception {
+ private static final long serialVersionUID = -9218191172606739598L;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
index 298ae5c..e297486 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
@@ -36,13 +36,20 @@ import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.HashMap;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -56,8 +63,15 @@ import static org.junit.Assert.fail;
* serializability is handled correctly.
*/
@SuppressWarnings("serial")
+@RunWith(Parameterized.class)
public class WindowCheckpointingITCase extends TestLogger {
+ private TimeCharacteristic timeCharacteristic;
+
+ public WindowCheckpointingITCase(TimeCharacteristic timeCharacteristic) {
+ this.timeCharacteristic = timeCharacteristic;
+ }
+
private static final int PARALLELISM = 4;
private static ForkableFlinkMiniCluster cluster;
@@ -94,7 +108,8 @@ public class WindowCheckpointingITCase extends TestLogger {
"localhost", cluster.getLeaderRPCPort());
env.setParallelism(PARALLELISM);
- env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+ env.setStreamTimeCharacteristic(timeCharacteristic);
+ env.getConfig().setAutoWatermarkInterval(10);
env.enableCheckpointing(100);
env.setNumberOfExecutionRetries(3);
env.getConfig().disableSysoutLogging();
@@ -151,7 +166,8 @@ public class WindowCheckpointingITCase extends TestLogger {
"localhost", cluster.getLeaderRPCPort());
env.setParallelism(PARALLELISM);
- env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+ env.setStreamTimeCharacteristic(timeCharacteristic);
+ env.getConfig().setAutoWatermarkInterval(10);
env.enableCheckpointing(100);
env.setNumberOfExecutionRetries(3);
env.getConfig().disableSysoutLogging();
@@ -208,7 +224,8 @@ public class WindowCheckpointingITCase extends TestLogger {
"localhost", cluster.getLeaderRPCPort());
env.setParallelism(PARALLELISM);
- env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+ env.setStreamTimeCharacteristic(timeCharacteristic);
+ env.getConfig().setAutoWatermarkInterval(10);
env.enableCheckpointing(100);
env.setNumberOfExecutionRetries(3);
env.getConfig().disableSysoutLogging();
@@ -266,7 +283,8 @@ public class WindowCheckpointingITCase extends TestLogger {
"localhost", cluster.getLeaderRPCPort());
env.setParallelism(PARALLELISM);
- env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+ env.setStreamTimeCharacteristic(timeCharacteristic);
+ env.getConfig().setAutoWatermarkInterval(10);
env.enableCheckpointing(100);
env.setNumberOfExecutionRetries(3);
env.getConfig().disableSysoutLogging();
@@ -462,6 +480,18 @@ public class WindowCheckpointingITCase extends TestLogger {
}
// ------------------------------------------------------------------------
+ // Parametrization for testing different time characteristics
+ // ------------------------------------------------------------------------
+
+ @Parameterized.Parameters(name = "TimeCharacteristic = {0}")
+ @SuppressWarnings("unchecked,rawtypes")
+ public static Collection<TimeCharacteristic[]> timeCharacteristic(){
+ return Arrays.asList(new TimeCharacteristic[]{TimeCharacteristic.ProcessingTime},
+ new TimeCharacteristic[]{TimeCharacteristic.IngestionTime}
+ );
+ }
+
+ // ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
@@ -498,4 +528,4 @@ public class WindowCheckpointingITCase extends TestLogger {
static final class SuccessException extends Exception {
private static final long serialVersionUID = -9218191172606739598L;
}
-}
\ No newline at end of file
+}