You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/08/11 15:53:35 UTC
[1/2] flink git commit: [FLINK-2423] [streaming] ITCase for
checkpoint notifications
Repository: flink
Updated Branches:
refs/heads/master f50ae26a2 -> 10ce2e2d0
[FLINK-2423] [streaming] ITCase for checkpoint notifications
Closes #980
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/10ce2e2d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/10ce2e2d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/10ce2e2d
Branch: refs/heads/master
Commit: 10ce2e2d033ddafb326c3b9b81e19e669c19df96
Parents: 4ca7df5
Author: mbalassi <mb...@apache.org>
Authored: Sat Aug 8 22:02:58 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Tue Aug 11 14:12:00 2015 +0200
----------------------------------------------------------------------
.../StreamCheckpointNotifierITCase.java | 135 ++++++++++---------
1 file changed, 70 insertions(+), 65 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/10ce2e2d/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
index 4c1ac4e..de8ee9d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
@@ -18,10 +18,11 @@
package org.apache.flink.test.checkpointing;
-import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
@@ -31,6 +32,8 @@ import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.util.Collector;
import java.io.IOException;
@@ -41,7 +44,6 @@ import java.util.Random;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
/**
* Integration test for the {@link CheckpointNotifier} interface. The test ensures that
@@ -50,6 +52,10 @@ import static org.junit.Assert.fail;
* called for a deliberately failed checkpoint.
*
* <p>
+ * The topology tested here includes a number of {@link OneInputStreamOperator}s and a
+ * {@link TwoInputStreamOperator}.
+ *
+ * <p>
* Note that as a result of doing the checks on the task level there is no way to verify
* that the {@link CheckpointNotifier#notifyCheckpointComplete(long)} is called for every
* successfully completed checkpoint.
@@ -57,40 +63,37 @@ import static org.junit.Assert.fail;
@SuppressWarnings("serial")
public class StreamCheckpointNotifierITCase extends StreamFaultToleranceTestBase {
- final long NUM_STRINGS = 10_000_000L;
+ final long NUM_LONGS = 10_000_000L;
/**
* Runs the following program:
*
* <pre>
- * [ (source)->(filter)->(map) ] -> [ (co-map) ] -> [ (map) ] -> [ (groupBy/reduce)->(sink) ]
+ * [ (source)->(filter) ] -> [ (co-map) ] -> [ (map) ] -> [ (groupBy/reduce)->(sink) ]
* </pre>
*/
@Override
public void testProgram(StreamExecutionEnvironment env) {
- assertTrue("Broken test setup", NUM_STRINGS % 40 == 0);
-
- DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS));
+ DataStream<Long> stream = env.addSource(new GeneratingSourceFunction(NUM_LONGS));
stream
// -------------- first vertex, chained to the src ----------------
- .filter(new StringRichFilterFunction())
+ .filter(new LongRichFilterFunction())
- // -------------- second vertex, applying the co-map ----------------
+ // -------------- second vertex, applying the co-map ----------------
.connect(stream).flatMap(new LeftIdentityCoRichFlatMapFunction())
// -------------- third vertex - the stateful one that also fails ----------------
- .map(new StringPrefixCountRichMapFunction())
- .startNewChain()
.map(new IdentityMapFunction())
+ .startNewChain()
- // -------------- fourth vertex - reducer and the sink ----------------
- .groupBy("prefix")
- .reduce(new OnceFailingReducer(NUM_STRINGS))
- .addSink(new SinkFunction<PrefixCount>() {
+ // -------------- fourth vertex - reducer and the sink ----------------
+ .groupBy(0)
+ .reduce(new OnceFailingReducer(NUM_LONGS))
+ .addSink(new SinkFunction<Tuple1<Long>>() {
@Override
- public void invoke(PrefixCount value) {
+ public void invoke(Tuple1<Long> value) {
// do nothing
}
});
@@ -98,18 +101,24 @@ public class StreamCheckpointNotifierITCase extends StreamFaultToleranceTestBase
@Override
public void postSubmit() {
- List[][] checkList = new List[][]{ StringGeneratingSourceFunction.completedCheckpoints,
+ List[][] checkList = new List[][]{ GeneratingSourceFunction.completedCheckpoints,
IdentityMapFunction.completedCheckpoints,
- StringPrefixCountRichMapFunction.completedCheckpoints,
+ LongRichFilterFunction.completedCheckpoints,
LeftIdentityCoRichFlatMapFunction.completedCheckpoints};
+ long failureCheckpointID = OnceFailingReducer.failureCheckpointID;
+
for(List[] parallelNotifications : checkList) {
for (int i = 0; i < PARALLELISM; i++){
List<Long> notifications = parallelNotifications[i];
assertTrue("No checkpoint notification was received.",
notifications.size() > 0);
assertFalse("Failure checkpoint was marked as completed.",
- notifications.contains(OnceFailingReducer.failureCheckpointID));
+ notifications.contains(failureCheckpointID));
+ assertFalse("No checkpoint received before failure.",
+ notifications.get(0) == failureCheckpointID);
+ assertFalse("No checkpoint received after failure.",
+ notifications.get(notifications.size() - 1) == failureCheckpointID);
assertTrue("Checkpoint notification was received multiple times",
notifications.size() == new HashSet<Long>(notifications).size());
}
@@ -120,17 +129,20 @@ public class StreamCheckpointNotifierITCase extends StreamFaultToleranceTestBase
// Custom Functions
// --------------------------------------------------------------------------------------------
- private static class StringGeneratingSourceFunction extends RichSourceFunction<String>
- implements ParallelSourceFunction<String>, CheckpointNotifier {
+ /**
+ * Generates some Long values and as an implementation for the {@link CheckpointNotifier}
+ * interface it stores all the checkpoint ids it has seen in a static list.
+ */
+ private static class GeneratingSourceFunction extends RichSourceFunction<Long>
+ implements ParallelSourceFunction<Long>, CheckpointNotifier {
// operator life cycle
private volatile boolean isRunning;
// operator behaviour
private final long numElements;
- private Random rnd;
+ private long result;
- private StringBuilder stringBuilder;
private OperatorState<Integer> index;
private int step;
@@ -138,14 +150,12 @@ public class StreamCheckpointNotifierITCase extends StreamFaultToleranceTestBase
private int subtaskId;
public static List[] completedCheckpoints = new List[PARALLELISM];
- StringGeneratingSourceFunction(long numElements) {
+ GeneratingSourceFunction(long numElements) {
this.numElements = numElements;
}
@Override
public void open(Configuration parameters) throws IOException {
- rnd = new Random();
- stringBuilder = new StringBuilder();
step = getRuntimeContext().getNumberOfParallelSubtasks();
subtaskId = getRuntimeContext().getIndexOfThisSubtask();
index = getRuntimeContext().getOperatorState("index", subtaskId, false);
@@ -159,16 +169,12 @@ public class StreamCheckpointNotifierITCase extends StreamFaultToleranceTestBase
}
@Override
- public void run(SourceContext<String> ctx) throws Exception {
+ public void run(SourceContext<Long> ctx) throws Exception {
final Object lockingObject = ctx.getCheckpointLock();
while (isRunning && index.value() < numElements) {
- char first = (char) ((index.value() % 40) + 40);
- stringBuilder.setLength(0);
- stringBuilder.append(first);
-
- String result = randomString(stringBuilder, rnd);
+ result = index.value() % 10;
synchronized (lockingObject) {
index.update(index.value() + step);
@@ -182,32 +188,25 @@ public class StreamCheckpointNotifierITCase extends StreamFaultToleranceTestBase
isRunning = false;
}
- private static String randomString(StringBuilder bld, Random rnd) {
- final int len = rnd.nextInt(10) + 5;
-
- for (int i = 0; i < len; i++) {
- char next = (char) (rnd.nextInt(20000) + 33);
- bld.append(next);
- }
-
- return bld.toString();
- }
-
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
completedCheckpoints[subtaskId].add(checkpointId);
}
}
- private static class IdentityMapFunction extends RichMapFunction<PrefixCount, PrefixCount>
+ /**
+ * Identity transform on Long values wrapping the output in a tuple. As an implementation
+ * for the {@link CheckpointNotifier} interface it stores all the checkpoint ids it has seen in a static list.
+ */
+ private static class IdentityMapFunction extends RichMapFunction<Long, Tuple1<Long>>
implements CheckpointNotifier {
public static List[] completedCheckpoints = new List[PARALLELISM];
private int subtaskId;
@Override
- public PrefixCount map(PrefixCount value) throws Exception {
- return value;
+ public Tuple1<Long> map(Long value) throws Exception {
+ return Tuple1.of(value);
}
@Override
@@ -226,7 +225,10 @@ public class StreamCheckpointNotifierITCase extends StreamFaultToleranceTestBase
}
}
- private static class OnceFailingReducer extends RichReduceFunction<PrefixCount> implements Checkpointed<Long>{
+ /**
+ * Reducer that causes one failure between seeing 40% to 70% of the records.
+ */
+ private static class OnceFailingReducer extends RichReduceFunction<Tuple1<Long>> implements Checkpointed<Long> {
private static volatile boolean hasFailed = false;
public static volatile long failureCheckpointID;
@@ -251,9 +253,9 @@ public class StreamCheckpointNotifierITCase extends StreamFaultToleranceTestBase
}
@Override
- public PrefixCount reduce(PrefixCount value1, PrefixCount value2) throws Exception {
+ public Tuple1<Long> reduce(Tuple1<Long> value1, Tuple1<Long> value2) throws Exception {
count++;
- value1.count += value2.count;
+ value1.f0 += value2.f0;
return value1;
}
@@ -273,20 +275,23 @@ public class StreamCheckpointNotifierITCase extends StreamFaultToleranceTestBase
}
}
- private static class StringRichFilterFunction implements FilterFunction<String> {
- @Override
- public boolean filter(String value) {
- return value.length() < 100;
- }
- }
-
- private static class StringPrefixCountRichMapFunction extends RichMapFunction<String, PrefixCount>
+ /**
+ * Filter on Long values supposedly letting all values through. As an implementation
+ * for the {@link CheckpointNotifier} interface it stores all the checkpoint ids
+ * it has seen in a static list.
+ */
+ private static class LongRichFilterFunction extends RichFilterFunction<Long>
implements CheckpointNotifier {
public static List[] completedCheckpoints = new List[PARALLELISM];
private int subtaskId;
@Override
+ public boolean filter(Long value) {
+ return value < 100;
+ }
+
+ @Override
public void open(Configuration conf) throws IOException {
subtaskId = getRuntimeContext().getIndexOfThisSubtask();
@@ -300,14 +305,14 @@ public class StreamCheckpointNotifierITCase extends StreamFaultToleranceTestBase
public void notifyCheckpointComplete(long checkpointId) throws Exception {
completedCheckpoints[subtaskId].add(checkpointId);
}
-
- @Override
- public PrefixCount map(String value) throws IOException {
- return new PrefixCount(value.substring(0, 1), value, 1L);
- }
}
- private static class LeftIdentityCoRichFlatMapFunction extends RichCoFlatMapFunction<String, String, String>
+ /**
+ * CoFlatMap on Long values as identity transform on the left input, while ignoring the right.
+ * As an implementation for the {@link CheckpointNotifier} interface it stores all the checkpoint
+ * ids it has seen in a static list.
+ */
+ private static class LeftIdentityCoRichFlatMapFunction extends RichCoFlatMapFunction<Long, Long, Long>
implements CheckpointNotifier {
public static List[] completedCheckpoints = new List[PARALLELISM];
@@ -324,12 +329,12 @@ public class StreamCheckpointNotifierITCase extends StreamFaultToleranceTestBase
}
@Override
- public void flatMap1(String value, Collector<String> out) throws IOException {
+ public void flatMap1(Long value, Collector<Long> out) throws IOException {
out.collect(value);
}
@Override
- public void flatMap2(String value, Collector<String> out) throws IOException {
+ public void flatMap2(Long value, Collector<Long> out) throws IOException {
// we ignore the values from the second input
}
[2/2] flink git commit: [streaming] [tests] Checkpointing tests
refactor & cleanup
Posted by mb...@apache.org.
[streaming] [tests] Checkpointing tests refactor & cleanup
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4ca7df59
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4ca7df59
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4ca7df59
Branch: refs/heads/master
Commit: 4ca7df592538edb6fd69159c7b955673ed3af820
Parents: f50ae26
Author: mbalassi <mb...@apache.org>
Authored: Tue Aug 4 10:11:10 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Tue Aug 11 14:12:00 2015 +0200
----------------------------------------------------------------------
docs/apis/streaming_guide.md | 2 +-
.../CoStreamCheckpointingITCase.java | 215 ++++--------
.../PartitionedStateCheckpointingITCase.java | 92 ++---
.../checkpointing/StateCheckpoinedITCase.java | 158 +++------
.../StreamCheckpointNotifierITCase.java | 341 +++++++++++++++++++
.../StreamCheckpointingITCase.java | 224 ++++--------
.../StreamFaultToleranceTestBase.java | 150 ++++++++
7 files changed, 693 insertions(+), 489 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/4ca7df59/docs/apis/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming_guide.md b/docs/apis/streaming_guide.md
index 080272b..b20482e 100644
--- a/docs/apis/streaming_guide.md
+++ b/docs/apis/streaming_guide.md
@@ -1355,7 +1355,7 @@ public static class CounterSource implements RichParallelSourceFunction<Long> {
}
{% endhighlight %}
-Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the `flink.streaming.api.checkpoint.CheckpointComitter` interface.
+Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the `flink.streaming.api.checkpoint.CheckpointNotifier` interface.
### Checkpointed interface
http://git-wip-us.apache.org/repos/asf/flink/blob/4ca7df59/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
index 6197092..258bee8 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
@@ -22,28 +22,22 @@ import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.apache.flink.util.Collector;
-import org.junit.AfterClass;
import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
import java.util.Random;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
/**
* A simple test that runs a streaming topology with checkpointing enabled. This differs from
@@ -64,44 +58,9 @@ import static org.junit.Assert.*;
* state reflects the "exactly once" semantics.
*/
@SuppressWarnings("serial")
-public class CoStreamCheckpointingITCase {
-
- private static final int NUM_TASK_MANAGERS = 2;
- private static final int NUM_TASK_SLOTS = 3;
- private static final int PARALLELISM = NUM_TASK_MANAGERS * NUM_TASK_SLOTS;
-
- private static ForkableFlinkMiniCluster cluster;
-
- @BeforeClass
- public static void startCluster() {
- try {
- Configuration config = new Configuration();
- config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS);
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_TASK_SLOTS);
- config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "0 ms");
- config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
-
- cluster = new ForkableFlinkMiniCluster(config, false);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail("Failed to start test cluster: " + e.getMessage());
- }
- }
-
- @AfterClass
- public static void shutdownCluster() {
- try {
- cluster.shutdown();
- cluster = null;
- }
- catch (Exception e) {
- e.printStackTrace();
- fail("Failed to stop test cluster: " + e.getMessage());
- }
- }
-
+public class CoStreamCheckpointingITCase extends StreamFaultToleranceTestBase {
+ final long NUM_STRINGS = 10_000_000L;
/**
* Runs the following program:
@@ -110,102 +69,76 @@ public class CoStreamCheckpointingITCase {
* [ (source)->(filter)->(map) ] -> [ (co-map) ] -> [ (map) ] -> [ (groupBy/reduce)->(sink) ]
* </pre>
*/
- @Test
- public void runCheckpointedProgram() {
+ @Override
+ public void testProgram(StreamExecutionEnvironment env) {
- final long NUM_STRINGS = 10000000L;
assertTrue("Broken test setup", NUM_STRINGS % 40 == 0);
- try {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
- "localhost", cluster.getJobManagerRPCPort());
- env.setParallelism(PARALLELISM);
- env.enableCheckpointing(500);
- env.getConfig().disableSysoutLogging();
-
- DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS));
-
- stream
- // -------------- first vertex, chained to the source ----------------
- .filter(new StringRichFilterFunction())
-
- // -------------- second vertex - the stateful one that also fails ----------------
- .connect(stream).flatMap(new LeftIdentityCoRichFlatMapFunction())
-
- // -------------- third vertex - the stateful one that also fails ----------------
- .map(new StringPrefixCountRichMapFunction())
- .startNewChain()
- .map(new StatefulCounterFunction())
-
- // -------------- fourth vertex - reducer and the sink ----------------
- .groupBy("prefix")
- .reduce(new OnceFailingReducer(NUM_STRINGS))
- .addSink(new RichSinkFunction<PrefixCount>() {
-
- private Map<Character, Long> counts = new HashMap<Character, Long>();
-
- @Override
- public void invoke(PrefixCount value) {
- Character first = value.prefix.charAt(0);
- Long previous = counts.get(first);
- if (previous == null) {
- counts.put(first, value.count);
- } else {
- counts.put(first, Math.max(previous, value.count));
- }
- }
-
-// @Override
-// public void close() {
-// for (Long count : counts.values()) {
-// assertEquals(NUM_STRINGS / 40, count.longValue());
-// }
-// }
- });
-
- env.execute();
-
- long filterSum = 0;
- for (long l : StringRichFilterFunction.counts) {
- filterSum += l;
- }
+ DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS));
- long coMapSum = 0;
- for (long l : LeftIdentityCoRichFlatMapFunction.counts) {
- coMapSum += l;
- }
+ stream
+ // -------------- first vertex, chained to the source ----------------
+ .filter(new StringRichFilterFunction())
- long mapSum = 0;
- for (long l : StringPrefixCountRichMapFunction.counts) {
- mapSum += l;
- }
+ // -------------- second vertex - the stateful one that also fails ----------------
+ .connect(stream).flatMap(new LeftIdentityCoRichFlatMapFunction())
- long countSum = 0;
- for (long l : StatefulCounterFunction.counts) {
- countSum += l;
- }
+ // -------------- third vertex - the stateful one that also fails ----------------
+ .map(new StringPrefixCountRichMapFunction())
+ .startNewChain()
+ .map(new StatefulCounterFunction())
- if (!StringPrefixCountRichMapFunction.restoreCalledAtLeastOnce) {
- Assert.fail("Restore was never called on counting Map function.");
- }
+ // -------------- fourth vertex - reducer and the sink ----------------
+ .groupBy("prefix")
+ .reduce(new OnceFailingReducer(NUM_STRINGS))
+ .addSink(new SinkFunction<PrefixCount>() {
- if (!LeftIdentityCoRichFlatMapFunction.restoreCalledAtLeastOnce) {
- Assert.fail("Restore was never called on counting CoMap function.");
- }
+ @Override
+ public void invoke(PrefixCount value) throws Exception {
+ // Do nothing here
+ }
+ });
+ }
+
+ @Override
+ public void postSubmit() {
+ long filterSum = 0;
+ for (long l : StringRichFilterFunction.counts) {
+ filterSum += l;
+ }
- // verify that we counted exactly right
+ long coMapSum = 0;
+ for (long l : LeftIdentityCoRichFlatMapFunction.counts) {
+ coMapSum += l;
+ }
+
+ long mapSum = 0;
+ for (long l : StringPrefixCountRichMapFunction.counts) {
+ mapSum += l;
+ }
- assertEquals(NUM_STRINGS, filterSum);
- assertEquals(NUM_STRINGS, coMapSum);
- assertEquals(NUM_STRINGS, mapSum);
- assertEquals(NUM_STRINGS, countSum);
+ long countSum = 0;
+ for (long l : StatefulCounterFunction.counts) {
+ countSum += l;
}
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
+
+ if (!StringPrefixCountRichMapFunction.restoreCalledAtLeastOnce) {
+ Assert.fail("Restore was never called on counting Map function.");
+ }
+
+ if (!LeftIdentityCoRichFlatMapFunction.restoreCalledAtLeastOnce) {
+ Assert.fail("Restore was never called on counting CoMap function.");
}
+
+ // verify that we counted exactly right
+
+ assertEquals(NUM_STRINGS, filterSum);
+ assertEquals(NUM_STRINGS, coMapSum);
+ assertEquals(NUM_STRINGS, mapSum);
+ assertEquals(NUM_STRINGS, countSum);
}
+
// --------------------------------------------------------------------------------------------
// Custom Functions
// --------------------------------------------------------------------------------------------
@@ -260,7 +193,6 @@ public class CoStreamCheckpointingITCase {
synchronized (lockingObject) {
index.update(index.value() + step);
-// System.out.println("SOURCE EMIT: " + result);
ctx.collect(result);
}
}
@@ -341,30 +273,6 @@ public class CoStreamCheckpointingITCase {
}
}
- // --------------------------------------------------------------------------------------------
- // Custom Type Classes
- // --------------------------------------------------------------------------------------------
-
- public static class PrefixCount {
-
- public String prefix;
- public String value;
- public long count;
-
- public PrefixCount() {}
-
- public PrefixCount(String prefix, String value, long count) {
- this.prefix = prefix;
- this.value = value;
- this.count = count;
- }
-
- @Override
- public String toString() {
- return prefix + " / " + value;
- }
- }
-
private static class StringRichFilterFunction extends RichFilterFunction<String> implements Checkpointed<Long> {
Long count = 0L;
@@ -434,7 +342,6 @@ public class CoStreamCheckpointingITCase {
@Override
public void flatMap1(String value, Collector<String> out) throws IOException {
count += 1;
-// System.out.println("Co-Map COUNT: " + count);
out.collect(value);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/4ca7df59/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
index 88361e2..d942a9e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
@@ -52,82 +52,36 @@ import org.junit.Test;
* It is designed to check partitioned states.
*/
@SuppressWarnings("serial")
-public class PartitionedStateCheckpointingITCase {
-
- private static final int NUM_TASK_MANAGERS = 2;
- private static final int NUM_TASK_SLOTS = 3;
- private static final int PARALLELISM = NUM_TASK_MANAGERS * NUM_TASK_SLOTS;
-
- private static ForkableFlinkMiniCluster cluster;
-
- @BeforeClass
- public static void startCluster() {
- try {
- Configuration config = new Configuration();
- config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS);
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_TASK_SLOTS);
- config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "0 ms");
- config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
-
- cluster = new ForkableFlinkMiniCluster(config, false);
- } catch (Exception e) {
- e.printStackTrace();
- fail("Failed to start test cluster: " + e.getMessage());
- }
- }
-
- @AfterClass
- public static void shutdownCluster() {
- try {
- cluster.shutdown();
- cluster = null;
- } catch (Exception e) {
- e.printStackTrace();
- fail("Failed to stop test cluster: " + e.getMessage());
- }
- }
+public class PartitionedStateCheckpointingITCase extends StreamFaultToleranceTestBase {
- @SuppressWarnings("unchecked")
- @Test
- public void runCheckpointedProgram() {
+ final long NUM_STRINGS = 10_000_000L;
- final long NUM_STRINGS = 10000000L;
+ @Override
+ public void testProgram(StreamExecutionEnvironment env) {
assertTrue("Broken test setup", (NUM_STRINGS/2) % 40 == 0);
- try {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost",
- cluster.getJobManagerRPCPort());
- env.setParallelism(PARALLELISM);
- env.enableCheckpointing(500);
- env.getConfig().disableSysoutLogging();
-
- DataStream<Integer> stream1 = env.addSource(new IntGeneratingSourceFunction(NUM_STRINGS / 2));
- DataStream<Integer> stream2 = env.addSource(new IntGeneratingSourceFunction(NUM_STRINGS / 2));
+ DataStream<Integer> stream1 = env.addSource(new IntGeneratingSourceFunction(NUM_STRINGS / 2));
+ DataStream<Integer> stream2 = env.addSource(new IntGeneratingSourceFunction(NUM_STRINGS / 2));
- stream1.union(stream2)
- .groupBy(new IdentityKeySelector<Integer>())
- .map(new OnceFailingPartitionedSum(NUM_STRINGS))
- .keyBy(0)
- .addSink(new CounterSink());
-
- env.execute();
+ stream1.union(stream2)
+ .groupBy(new IdentityKeySelector<Integer>())
+ .map(new OnceFailingPartitionedSum(NUM_STRINGS))
+ .keyBy(0)
+ .addSink(new CounterSink());
+ }
- // verify that we counted exactly right
- for (Entry<Integer, Long> sum : OnceFailingPartitionedSum.allSums.entrySet()) {
- assertEquals(new Long(sum.getKey() * NUM_STRINGS / 40), sum.getValue());
- }
- System.out.println("new");
- for (Long count : CounterSink.allCounts.values()) {
- assertEquals(new Long(NUM_STRINGS / 40), count);
- }
-
- assertEquals(40, CounterSink.allCounts.size());
- assertEquals(40, OnceFailingPartitionedSum.allSums.size());
-
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
+ @Override
+ public void postSubmit() {
+ // verify that we counted exactly right
+ for (Entry<Integer, Long> sum : OnceFailingPartitionedSum.allSums.entrySet()) {
+ assertEquals(new Long(sum.getKey() * NUM_STRINGS / 40), sum.getValue());
+ }
+ for (Long count : CounterSink.allCounts.values()) {
+ assertEquals(new Long(NUM_STRINGS / 40), count);
}
+
+ assertEquals(40, CounterSink.allCounts.size());
+ assertEquals(40, OnceFailingPartitionedSum.allSums.size());
}
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/4ca7df59/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java
index 39ff2e5..072086b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java
@@ -49,48 +49,15 @@ import static org.junit.Assert.fail;
/**
* A simple test that runs a streaming topology with checkpointing enabled.
- *
+ *
* The test triggers a failure after a while and verifies that, after completion, the
- * state reflects the "exactly once" semantics.
+ * state defined with either the {@link OperatorState} or the {@link Checkpointed}
+ * interface reflects the "exactly once" semantics.
*/
@SuppressWarnings("serial")
-public class StateCheckpoinedITCase {
-
- private static final int NUM_TASK_MANAGERS = 2;
- private static final int NUM_TASK_SLOTS = 3;
- private static final int PARALLELISM = NUM_TASK_MANAGERS * NUM_TASK_SLOTS;
-
- private static ForkableFlinkMiniCluster cluster;
-
- @BeforeClass
- public static void startCluster() {
- try {
- Configuration config = new Configuration();
- config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS);
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_TASK_SLOTS);
- config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "0 ms");
- config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
-
- cluster = new ForkableFlinkMiniCluster(config, false);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail("Failed to start test cluster: " + e.getMessage());
- }
- }
-
- @AfterClass
- public static void shutdownCluster() {
- try {
- cluster.shutdown();
- cluster = null;
- }
- catch (Exception e) {
- e.printStackTrace();
- fail("Failed to stop test cluster: " + e.getMessage());
- }
- }
+public class StateCheckpoinedITCase extends StreamFaultToleranceTestBase {
+ final long NUM_STRINGS = 10_000_000L;
/**
* Runs the following program:
@@ -99,69 +66,56 @@ public class StateCheckpoinedITCase {
* [ (source)->(filter)->(map) ] -> [ (map) ] -> [ (groupBy/reduce)->(sink) ]
* </pre>
*/
- @Test
- public void runCheckpointedProgram() {
-
- final long NUM_STRINGS = 10000000L;
+ @Override
+ public void testProgram(StreamExecutionEnvironment env) {
assertTrue("Broken test setup", NUM_STRINGS % 40 == 0);
-
- try {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
- "localhost", cluster.getJobManagerRPCPort());
- env.setParallelism(PARALLELISM);
- env.enableCheckpointing(500);
- env.getConfig().disableSysoutLogging();
-
- DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS));
-
- stream
- // -------------- first vertex, chained to the source ----------------
- .filter(new StringRichFilterFunction())
- // -------------- seconds vertex - one-to-one connected ----------------
- .map(new StringPrefixCountRichMapFunction())
- .startNewChain()
- .map(new StatefulCounterFunction())
+ DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS));
- // -------------- third vertex - reducer and the sink ----------------
- .partitionByHash("prefix")
- .flatMap(new OnceFailingAggregator(NUM_STRINGS))
- .addSink(new ValidatingSink());
+ stream
+ // -------------- first vertex, chained to the source ----------------
+ .filter(new StringRichFilterFunction())
- env.execute();
-
- long filterSum = 0;
- for (long l : StringRichFilterFunction.counts) {
- filterSum += l;
- }
+ // -------------- seconds vertex - one-to-one connected ----------------
+ .map(new StringPrefixCountRichMapFunction())
+ .startNewChain()
+ .map(new StatefulCounterFunction())
- long mapSum = 0;
- for (long l : StringPrefixCountRichMapFunction.counts) {
- mapSum += l;
- }
+ // -------------- third vertex - reducer and the sink ----------------
+ .partitionByHash("prefix")
+ .flatMap(new OnceFailingAggregator(NUM_STRINGS))
+ .addSink(new ValidatingSink());
+ }
- long countSum = 0;
- for (long l : StatefulCounterFunction.counts) {
- countSum += l;
- }
+ @Override
+ public void postSubmit() {
+ long filterSum = 0;
+ for (long l : StringRichFilterFunction.counts) {
+ filterSum += l;
+ }
+
+ long mapSum = 0;
+ for (long l : StringPrefixCountRichMapFunction.counts) {
+ mapSum += l;
+ }
- // verify that we counted exactly right
- assertEquals(NUM_STRINGS, filterSum);
- assertEquals(NUM_STRINGS, mapSum);
- assertEquals(NUM_STRINGS, countSum);
+ long countSum = 0;
+ for (long l : StatefulCounterFunction.counts) {
+ countSum += l;
+ }
- for (Map<Character, Long> map : ValidatingSink.maps) {
- for (Long count : map.values()) {
- assertEquals(NUM_STRINGS / 40, count.longValue());
- }
+ // verify that we counted exactly right
+ assertEquals(NUM_STRINGS, filterSum);
+ assertEquals(NUM_STRINGS, mapSum);
+ assertEquals(NUM_STRINGS, countSum);
+
+ for (Map<Character, Long> map : ValidatingSink.maps) {
+ for (Long count : map.values()) {
+ assertEquals(NUM_STRINGS / 40, count.longValue());
}
}
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
}
-
+
// --------------------------------------------------------------------------------------------
// Custom Functions
// --------------------------------------------------------------------------------------------
@@ -406,28 +360,4 @@ public class StateCheckpoinedITCase {
counts.putAll(state);
}
}
-
- // --------------------------------------------------------------------------------------------
- // Custom Type Classes
- // --------------------------------------------------------------------------------------------
-
- public static class PrefixCount implements Serializable {
-
- public String prefix;
- public String value;
- public long count;
-
- public PrefixCount() {}
-
- public PrefixCount(String prefix, String value, long count) {
- this.prefix = prefix;
- this.value = value;
- this.count = count;
- }
-
- @Override
- public String toString() {
- return prefix + " / " + value;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/4ca7df59/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
new file mode 100644
index 0000000..4c1ac4e
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.util.Collector;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Integration test for the {@link CheckpointNotifier} interface. The test ensures that
+ * {@link CheckpointNotifier#notifyCheckpointComplete(long)} is called for some completed
+ * checkpoints, that it is called at most once for any checkpoint id and that it is not
+ * called for a deliberately failed checkpoint.
+ *
+ * <p>
+ * Note that as a result of doing the checks on the task level there is no way to verify
+ * that the {@link CheckpointNotifier#notifyCheckpointComplete(long)} is called for every
+ * successfully completed checkpoint.
+ */
+@SuppressWarnings("serial")
+public class StreamCheckpointNotifierITCase extends StreamFaultToleranceTestBase {
+
+ final long NUM_STRINGS = 10_000_000L;
+
+ /**
+ * Runs the following program:
+ *
+ * <pre>
+ * [ (source)->(filter)->(map) ] -> [ (co-map) ] -> [ (map) ] -> [ (groupBy/reduce)->(sink) ]
+ * </pre>
+ */
+ @Override
+ public void testProgram(StreamExecutionEnvironment env) {
+
+ assertTrue("Broken test setup", NUM_STRINGS % 40 == 0);
+
+ DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS));
+
+ stream
+ // -------------- first vertex, chained to the src ----------------
+ .filter(new StringRichFilterFunction())
+
+ // -------------- second vertex, applying the co-map ----------------
+ .connect(stream).flatMap(new LeftIdentityCoRichFlatMapFunction())
+
+ // -------------- third vertex - the stateful one that also fails ----------------
+ .map(new StringPrefixCountRichMapFunction())
+ .startNewChain()
+ .map(new IdentityMapFunction())
+
+ // -------------- fourth vertex - reducer and the sink ----------------
+ .groupBy("prefix")
+ .reduce(new OnceFailingReducer(NUM_STRINGS))
+ .addSink(new SinkFunction<PrefixCount>() {
+ @Override
+ public void invoke(PrefixCount value) {
+ // do nothing
+ }
+ });
+ }
+
+ @Override
+ public void postSubmit() {
+ List[][] checkList = new List[][]{ StringGeneratingSourceFunction.completedCheckpoints,
+ IdentityMapFunction.completedCheckpoints,
+ StringPrefixCountRichMapFunction.completedCheckpoints,
+ LeftIdentityCoRichFlatMapFunction.completedCheckpoints};
+
+ for(List[] parallelNotifications : checkList) {
+ for (int i = 0; i < PARALLELISM; i++){
+ List<Long> notifications = parallelNotifications[i];
+ assertTrue("No checkpoint notification was received.",
+ notifications.size() > 0);
+ assertFalse("Failure checkpoint was marked as completed.",
+ notifications.contains(OnceFailingReducer.failureCheckpointID));
+ assertTrue("Checkpoint notification was received multiple times",
+ notifications.size() == new HashSet<Long>(notifications).size());
+ }
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Custom Functions
+ // --------------------------------------------------------------------------------------------
+
+ private static class StringGeneratingSourceFunction extends RichSourceFunction<String>
+ implements ParallelSourceFunction<String>, CheckpointNotifier {
+
+ // operator life cycle
+ private volatile boolean isRunning;
+
+ // operator behaviour
+ private final long numElements;
+ private Random rnd;
+
+ private StringBuilder stringBuilder;
+ private OperatorState<Integer> index;
+ private int step;
+
+ // test behaviour
+ private int subtaskId;
+ public static List[] completedCheckpoints = new List[PARALLELISM];
+
+ StringGeneratingSourceFunction(long numElements) {
+ this.numElements = numElements;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws IOException {
+ rnd = new Random();
+ stringBuilder = new StringBuilder();
+ step = getRuntimeContext().getNumberOfParallelSubtasks();
+ subtaskId = getRuntimeContext().getIndexOfThisSubtask();
+ index = getRuntimeContext().getOperatorState("index", subtaskId, false);
+
+ // Create a collection on the first open
+ if (completedCheckpoints[subtaskId] == null) {
+ completedCheckpoints[subtaskId] = new ArrayList();
+ }
+
+ isRunning = true;
+ }
+
+ @Override
+ public void run(SourceContext<String> ctx) throws Exception {
+ final Object lockingObject = ctx.getCheckpointLock();
+
+ while (isRunning && index.value() < numElements) {
+ char first = (char) ((index.value() % 40) + 40);
+
+ stringBuilder.setLength(0);
+ stringBuilder.append(first);
+
+ String result = randomString(stringBuilder, rnd);
+
+ synchronized (lockingObject) {
+ index.update(index.value() + step);
+ ctx.collect(result);
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {
+ isRunning = false;
+ }
+
+ private static String randomString(StringBuilder bld, Random rnd) {
+ final int len = rnd.nextInt(10) + 5;
+
+ for (int i = 0; i < len; i++) {
+ char next = (char) (rnd.nextInt(20000) + 33);
+ bld.append(next);
+ }
+
+ return bld.toString();
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ completedCheckpoints[subtaskId].add(checkpointId);
+ }
+ }
+
+ private static class IdentityMapFunction extends RichMapFunction<PrefixCount, PrefixCount>
+ implements CheckpointNotifier {
+
+ public static List[] completedCheckpoints = new List[PARALLELISM];
+ private int subtaskId;
+
+ @Override
+ public PrefixCount map(PrefixCount value) throws Exception {
+ return value;
+ }
+
+ @Override
+ public void open(Configuration conf) throws IOException {
+ subtaskId = getRuntimeContext().getIndexOfThisSubtask();
+
+ // Create a collection on the first open
+ if (completedCheckpoints[subtaskId] == null) {
+ completedCheckpoints[subtaskId] = new ArrayList();
+ }
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ completedCheckpoints[subtaskId].add(checkpointId);
+ }
+ }
+
+ private static class OnceFailingReducer extends RichReduceFunction<PrefixCount> implements Checkpointed<Long>{
+
+ private static volatile boolean hasFailed = false;
+ public static volatile long failureCheckpointID;
+
+ private final long numElements;
+
+ private long failurePos;
+ private long count;
+
+
+ OnceFailingReducer(long numElements) {
+ this.numElements = numElements;
+ }
+
+ @Override
+ public void open(Configuration parameters) {
+ long failurePosMin = (long) (0.4 * numElements / getRuntimeContext().getNumberOfParallelSubtasks());
+ long failurePosMax = (long) (0.7 * numElements / getRuntimeContext().getNumberOfParallelSubtasks());
+
+ failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
+ count = 0;
+ }
+
+ @Override
+ public PrefixCount reduce(PrefixCount value1, PrefixCount value2) throws Exception {
+ count++;
+ value1.count += value2.count;
+ return value1;
+ }
+
+ @Override
+ public Long snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+ if (!hasFailed && count >= failurePos) {
+ hasFailed = true;
+ failureCheckpointID = checkpointId;
+ throw new Exception("Test Failure");
+ }
+ return count;
+ }
+
+ @Override
+ public void restoreState(Long state) {
+ count = state;
+ }
+ }
+
+ private static class StringRichFilterFunction implements FilterFunction<String> {
+ @Override
+ public boolean filter(String value) {
+ return value.length() < 100;
+ }
+ }
+
+ private static class StringPrefixCountRichMapFunction extends RichMapFunction<String, PrefixCount>
+ implements CheckpointNotifier {
+
+ public static List[] completedCheckpoints = new List[PARALLELISM];
+ private int subtaskId;
+
+ @Override
+ public void open(Configuration conf) throws IOException {
+ subtaskId = getRuntimeContext().getIndexOfThisSubtask();
+
+ // Create a collection on the first open
+ if (completedCheckpoints[subtaskId] == null) {
+ completedCheckpoints[subtaskId] = new ArrayList();
+ }
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ completedCheckpoints[subtaskId].add(checkpointId);
+ }
+
+ @Override
+ public PrefixCount map(String value) throws IOException {
+ return new PrefixCount(value.substring(0, 1), value, 1L);
+ }
+ }
+
+ private static class LeftIdentityCoRichFlatMapFunction extends RichCoFlatMapFunction<String, String, String>
+ implements CheckpointNotifier {
+
+ public static List[] completedCheckpoints = new List[PARALLELISM];
+ private int subtaskId;
+
+ @Override
+ public void open(Configuration conf) throws IOException {
+ subtaskId = getRuntimeContext().getIndexOfThisSubtask();
+
+ // Create a collection on the first open
+ if (completedCheckpoints[subtaskId] == null) {
+ completedCheckpoints[subtaskId] = new ArrayList();
+ }
+ }
+
+ @Override
+ public void flatMap1(String value, Collector<String> out) throws IOException {
+ out.collect(value);
+ }
+
+ @Override
+ public void flatMap2(String value, Collector<String> out) throws IOException {
+ // we ignore the values from the second input
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ completedCheckpoints[subtaskId].add(checkpointId);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4ca7df59/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
index 93dda5f..d54d425 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
@@ -18,21 +18,9 @@
package org.apache.flink.test.checkpointing;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.ConcurrentHashMap;
-
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -40,56 +28,25 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
/**
* A simple test that runs a streaming topology with checkpointing enabled.
*
* The test triggers a failure after a while and verifies that, after completion, the
- * state reflects the "exactly once" semantics.
+ * state defined with the {@link Checkpointed} interface reflects the "exactly once" semantics.
*/
@SuppressWarnings("serial")
-public class StreamCheckpointingITCase {
-
- private static final int NUM_TASK_MANAGERS = 2;
- private static final int NUM_TASK_SLOTS = 3;
- private static final int PARALLELISM = NUM_TASK_MANAGERS * NUM_TASK_SLOTS;
-
- private static ForkableFlinkMiniCluster cluster;
-
- @BeforeClass
- public static void startCluster() {
- try {
- Configuration config = new Configuration();
- config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS);
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_TASK_SLOTS);
- config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "0 ms");
- config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
-
- cluster = new ForkableFlinkMiniCluster(config, false);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail("Failed to start test cluster: " + e.getMessage());
- }
- }
-
- @AfterClass
- public static void shutdownCluster() {
- try {
- cluster.shutdown();
- cluster = null;
- }
- catch (Exception e) {
- e.printStackTrace();
- fail("Failed to stop test cluster: " + e.getMessage());
- }
- }
+public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
+ final long NUM_STRINGS = 10_000_000L;
/**
* Runs the following program:
@@ -98,78 +55,63 @@ public class StreamCheckpointingITCase {
* [ (source)->(filter) ]-s->[ (map) ] -> [ (map) ] -> [ (groupBy/count)->(sink) ]
* </pre>
*/
- @Test
- public void runCheckpointedProgram() {
+ @Override
+ public void testProgram(StreamExecutionEnvironment env) {
+ DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS));
+
+ stream
+ // -------------- first vertex, chained to the source ----------------
+ .filter(new StringRichFilterFunction()).shuffle()
+
+ // -------------- seconds vertex - the stateful one that also fails ----------------
+ .map(new StringPrefixCountRichMapFunction())
+ .startNewChain()
+ .map(new StatefulCounterFunction())
+
+ // -------------- third vertex - counter and the sink ----------------
+ .groupBy("prefix")
+ .map(new OnceFailingPrefixCounter(NUM_STRINGS))
+ .addSink(new SinkFunction<PrefixCount>() {
+
+ @Override
+ public void invoke(PrefixCount value) throws Exception {
+ // Do nothing here
+ }
+ });
+ }
- final long NUM_STRINGS = 10000000L;
- assertTrue("Broken test setup", NUM_STRINGS % 40 == 0);
-
- try {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
- "localhost", cluster.getJobManagerRPCPort());
- env.setParallelism(PARALLELISM);
- env.enableCheckpointing(500);
- env.getConfig().disableSysoutLogging();
-
- DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS));
-
- stream
- // -------------- first vertex, chained to the source ----------------
- .filter(new StringRichFilterFunction()).shuffle()
-
- // -------------- seconds vertex - the stateful one that also fails ----------------
- .map(new StringPrefixCountRichMapFunction())
- .startNewChain()
- .map(new StatefulCounterFunction())
-
- // -------------- third vertex - counter and the sink ----------------
- .groupBy("prefix")
- .map(new OnceFailingPrefixCounter(NUM_STRINGS))
- .addSink(new SinkFunction<PrefixCount>() {
-
- @Override
- public void invoke(PrefixCount value) throws Exception {
- // Do nothing here
- }
- });
-
- env.execute();
-
- long filterSum = 0;
- for (long l : StringRichFilterFunction.counts) {
- filterSum += l;
- }
+ @Override
+ public void postSubmit() {
+ long filterSum = 0;
+ for (long l : StringRichFilterFunction.counts) {
+ filterSum += l;
+ }
- long mapSum = 0;
- for (long l : StringPrefixCountRichMapFunction.counts) {
- mapSum += l;
- }
+ long mapSum = 0;
+ for (long l : StringPrefixCountRichMapFunction.counts) {
+ mapSum += l;
+ }
- long countSum = 0;
- for (long l : StatefulCounterFunction.counts) {
- countSum += l;
- }
-
- long reduceInputCount = 0;
- for(long l: OnceFailingPrefixCounter.counts){
- reduceInputCount += l;
- }
-
- assertEquals(NUM_STRINGS, filterSum);
- assertEquals(NUM_STRINGS, mapSum);
- assertEquals(NUM_STRINGS, countSum);
- assertEquals(NUM_STRINGS, reduceInputCount);
- // verify that we counted exactly right
- for (Long count : OnceFailingPrefixCounter.prefixCounts.values()) {
- assertEquals(new Long(NUM_STRINGS / 40), count);
- }
+ long countSum = 0;
+ for (long l : StatefulCounterFunction.counts) {
+ countSum += l;
}
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
+
+ long reduceInputCount = 0;
+ for(long l: OnceFailingPrefixCounter.counts){
+ reduceInputCount += l;
+ }
+
+ assertEquals(NUM_STRINGS, filterSum);
+ assertEquals(NUM_STRINGS, mapSum);
+ assertEquals(NUM_STRINGS, countSum);
+ assertEquals(NUM_STRINGS, reduceInputCount);
+ // verify that we counted exactly right
+ for (Long count : OnceFailingPrefixCounter.prefixCounts.values()) {
+ assertEquals(new Long(NUM_STRINGS / 40), count);
}
}
-
+
// --------------------------------------------------------------------------------------------
// Custom Functions
// --------------------------------------------------------------------------------------------
@@ -246,27 +188,31 @@ public class StreamCheckpointingITCase {
}
}
- private static class StatefulCounterFunction extends RichMapFunction<PrefixCount, PrefixCount> {
+ private static class StatefulCounterFunction extends RichMapFunction<PrefixCount, PrefixCount> implements Checkpointed<Long> {
- private OperatorState<Long> count;
+ private long count;
static final long[] counts = new long[PARALLELISM];
@Override
public PrefixCount map(PrefixCount value) throws Exception {
- count.update(count.value() + 1);
+ count++;
return value;
}
@Override
- public void open(Configuration conf) throws IOException {
- count = getRuntimeContext().getOperatorState("count", 0L, false);
+ public void close() throws IOException {
+ counts[getRuntimeContext().getIndexOfThisSubtask()] = count;
}
@Override
- public void close() throws IOException {
- counts[getRuntimeContext().getIndexOfThisSubtask()] = count.value();
+ public Long snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+ return count;
+ }
+
+ @Override
+ public void restoreState(Long state) {
+ count = state;
}
-
}
private static class OnceFailingPrefixCounter extends RichMapFunction<PrefixCount, PrefixCount> {
@@ -320,30 +266,6 @@ public class StreamCheckpointingITCase {
return value;
}
}
-
- // --------------------------------------------------------------------------------------------
- // Custom Type Classes
- // --------------------------------------------------------------------------------------------
-
- public static class PrefixCount implements Serializable {
-
- public String prefix;
- public String value;
- public long count;
-
- public PrefixCount() {}
-
- public PrefixCount(String prefix, String value, long count) {
- this.prefix = prefix;
- this.value = value;
- this.count = count;
- }
-
- @Override
- public String toString() {
- return prefix + " / " + value;
- }
- }
private static class StringRichFilterFunction extends RichFilterFunction<String> implements Checkpointed<Long> {
http://git-wip-us.apache.org/repos/asf/flink/blob/4ca7df59/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
new file mode 100644
index 0000000..2993315
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+
+import org.apache.flink.api.common.functions.RichFilterFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+
+/**
+ * Test base for fault tolerant streaming programs
+ */
+@SuppressWarnings("serial")
+public abstract class StreamFaultToleranceTestBase {
+
+ protected static final int NUM_TASK_MANAGERS = 2;
+ protected static final int NUM_TASK_SLOTS = 3;
+ protected static final int PARALLELISM = NUM_TASK_MANAGERS * NUM_TASK_SLOTS;
+
+ private static ForkableFlinkMiniCluster cluster;
+
+ @BeforeClass
+ public static void startCluster() {
+ try {
+ Configuration config = new Configuration();
+ config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS);
+ config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_TASK_SLOTS);
+ config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "0 ms");
+ config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
+
+ cluster = new ForkableFlinkMiniCluster(config, false);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail("Failed to start test cluster: " + e.getMessage());
+ }
+ }
+
+ @AfterClass
+ public static void shutdownCluster() {
+ try {
+ cluster.shutdown();
+ cluster = null;
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail("Failed to stop test cluster: " + e.getMessage());
+ }
+ }
+
+ /**
+ * Implementations are expected to assemble the test topology in this function
+ * using the provided {@link StreamExecutionEnvironment}.
+ */
+ abstract public void testProgram(StreamExecutionEnvironment env);
+
+ /**
+ * Implementations are expected to provide test here to verify the correct behavior.
+ */
+ abstract public void postSubmit();
+
+ /**
+ * Runs the following program the test program defined in {@link #testProgram(StreamExecutionEnvironment)}
+ * followed by the checks in {@link #postSubmit}.
+ */
+ @Test
+ public void runCheckpointedProgram() {
+ try {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
+ "localhost", cluster.getJobManagerRPCPort());
+ env.setParallelism(PARALLELISM);
+ env.enableCheckpointing(500);
+ env.getConfig().disableSysoutLogging();
+
+ testProgram(env);
+
+ env.execute();
+
+ postSubmit();
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Frequently used utilities
+ // --------------------------------------------------------------------------------------------
+
+ public static class PrefixCount implements Serializable {
+
+ public String prefix;
+ public String value;
+ public long count;
+
+ public PrefixCount() {}
+
+ public PrefixCount(String prefix, String value, long count) {
+ this.prefix = prefix;
+ this.value = value;
+ this.count = count;
+ }
+
+ @Override
+ public String toString() {
+ return prefix + " / " + value;
+ }
+ }
+
+}