You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/17 14:14:54 UTC
[6/6] flink git commit: [tests] Reinforce StateCheckpoinedITCase to
make sure actual checkpointing has happened before a failure.
[tests] Reinforce StateCheckpoinedITCase to make sure actual checkpointing has happened before a failure.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3fcc04ab
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3fcc04ab
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3fcc04ab
Branch: refs/heads/master
Commit: 3fcc04ab3583e14d9d0acd1e29adee900738ffde
Parents: 92b1e47
Author: Stephan Ewen <se...@apache.org>
Authored: Sun Aug 16 16:52:16 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 17 12:35:19 2015 +0200
----------------------------------------------------------------------
.../checkpointing/StateCheckpoinedITCase.java | 62 ++++++++++++--------
.../StreamFaultToleranceTestBase.java | 20 +------
2 files changed, 40 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/3fcc04ab/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java
index 072086b..2c2f2b4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java
@@ -22,30 +22,24 @@ import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.apache.flink.util.Collector;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
import java.io.IOException;
-import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
/**
* A simple test that runs a streaming topology with checkpointing enabled.
@@ -53,6 +47,10 @@ import static org.junit.Assert.fail;
* The test triggers a failure after a while and verifies that, after completion, the
* state defined with either the {@link OperatorState} or the {@link Checkpointed}
* interface reflects the "exactly once" semantics.
+ *
+ * The test throttles the input until at least two checkpoints are completed, to make sure that
+ * the recovery does not fall back to "square one" (which would naturally lead to correct
+ * results without testing the checkpointing).
*/
@SuppressWarnings("serial")
public class StateCheckpoinedITCase extends StreamFaultToleranceTestBase {
@@ -63,17 +61,24 @@ public class StateCheckpoinedITCase extends StreamFaultToleranceTestBase {
* Runs the following program:
*
* <pre>
- * [ (source)->(filter)->(map) ] -> [ (map) ] -> [ (groupBy/reduce)->(sink) ]
+ * [ (source)->(filter)] -> [ (map) -> (map) ] -> [ (groupBy/reduce)->(sink) ]
* </pre>
*/
@Override
public void testProgram(StreamExecutionEnvironment env) {
assertTrue("Broken test setup", NUM_STRINGS % 40 == 0);
+ final long failurePosMin = (long) (0.4 * NUM_STRINGS / PARALLELISM);
+ final long failurePosMax = (long) (0.7 * NUM_STRINGS / PARALLELISM);
+
+ final long failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
+
DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS));
stream
- // -------------- first vertex, chained to the source ----------------
+ // first vertex, chained to the source
+ // this filter throttles the flow until at least one checkpoint
+ // is complete, to make sure this program does not run without
.filter(new StringRichFilterFunction())
// -------------- seconds vertex - one-to-one connected ----------------
@@ -83,12 +88,16 @@ public class StateCheckpoinedITCase extends StreamFaultToleranceTestBase {
// -------------- third vertex - reducer and the sink ----------------
.partitionByHash("prefix")
- .flatMap(new OnceFailingAggregator(NUM_STRINGS))
+ .flatMap(new OnceFailingAggregator(failurePos))
.addSink(new ValidatingSink());
}
@Override
public void postSubmit() {
+
+ assertTrue("Test inconclusive: failure occurred before first checkpoint",
+ OnceFailingAggregator.wasCheckpointedBeforeFailure);
+
long filterSum = 0;
for (long l : StringRichFilterFunction.counts) {
filterSum += l;
@@ -189,14 +198,15 @@ public class StateCheckpoinedITCase extends StreamFaultToleranceTestBase {
}
}
- private static class StringRichFilterFunction extends RichFilterFunction<String> implements Checkpointed<Long> {
+ private static class StringRichFilterFunction extends RichFilterFunction<String>
+ implements Checkpointed<Long> {
static final long[] counts = new long[PARALLELISM];
-
+
private long count;
-
+
@Override
- public boolean filter(String value) {
+ public boolean filter(String value) throws Exception {
count++;
return value.length() < 100; // should be always true
}
@@ -271,35 +281,34 @@ public class StateCheckpoinedITCase extends StreamFaultToleranceTestBase {
}
private static class OnceFailingAggregator extends RichFlatMapFunction<PrefixCount, PrefixCount>
- implements Checkpointed<HashMap<String, PrefixCount>> {
+ implements Checkpointed<HashMap<String, PrefixCount>>, CheckpointNotifier {
+ static boolean wasCheckpointedBeforeFailure = false;
+
private static volatile boolean hasFailed = false;
private final HashMap<String, PrefixCount> aggregationMap = new HashMap<String, PrefixCount>();
- private final long numElements;
-
private long failurePos;
private long count;
+ private boolean wasCheckpointed;
+
- OnceFailingAggregator(long numElements) {
- this.numElements = numElements;
+ OnceFailingAggregator(long failurePos) {
+ this.failurePos = failurePos;
}
@Override
public void open(Configuration parameters) {
- long failurePosMin = (long) (0.4 * numElements / getRuntimeContext().getNumberOfParallelSubtasks());
- long failurePosMax = (long) (0.7 * numElements / getRuntimeContext().getNumberOfParallelSubtasks());
-
- failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
count = 0;
}
@Override
public void flatMap(PrefixCount value, Collector<PrefixCount> out) throws Exception {
count++;
- if (!hasFailed && count >= failurePos) {
+ if (!hasFailed && count >= failurePos && getRuntimeContext().getIndexOfThisSubtask() == 1) {
+ wasCheckpointedBeforeFailure = wasCheckpointed;
hasFailed = true;
throw new Exception("Test Failure");
}
@@ -324,6 +333,11 @@ public class StateCheckpoinedITCase extends StreamFaultToleranceTestBase {
public void restoreState(HashMap<String, PrefixCount> state) {
aggregationMap.putAll(state);
}
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) {
+ this.wasCheckpointed = true;
+ }
}
private static class ValidatingSink extends RichSinkFunction<PrefixCount>
http://git-wip-us.apache.org/repos/asf/flink/blob/3fcc04ab/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
index 2993315..8920cf2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
@@ -18,38 +18,22 @@
package org.apache.flink.test.checkpointing;
-
-import org.apache.flink.api.common.functions.RichFilterFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.state.OperatorState;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
-import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
-import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
-import java.io.IOException;
import java.io.Serializable;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.ConcurrentHashMap;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-
/**
* Test base for fault tolerant streaming programs
*/
-@SuppressWarnings("serial")
public abstract class StreamFaultToleranceTestBase {
protected static final int NUM_TASK_MANAGERS = 2;
@@ -127,6 +111,7 @@ public abstract class StreamFaultToleranceTestBase {
// Frequently used utilities
// --------------------------------------------------------------------------------------------
+ @SuppressWarnings("serial")
public static class PrefixCount implements Serializable {
public String prefix;
@@ -146,5 +131,4 @@ public abstract class StreamFaultToleranceTestBase {
return prefix + " / " + value;
}
}
-
}