You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/17 14:14:54 UTC

[6/6] flink git commit: [tests] Reinforce StateCheckpoinedITCase to make sure actual checkpointing has happened before a failure.

[tests] Reinforce StateCheckpoinedITCase to make sure actual checkpointing has happened before a failure.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3fcc04ab
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3fcc04ab
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3fcc04ab

Branch: refs/heads/master
Commit: 3fcc04ab3583e14d9d0acd1e29adee900738ffde
Parents: 92b1e47
Author: Stephan Ewen <se...@apache.org>
Authored: Sun Aug 16 16:52:16 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 17 12:35:19 2015 +0200

----------------------------------------------------------------------
 .../checkpointing/StateCheckpoinedITCase.java   | 62 ++++++++++++--------
 .../StreamFaultToleranceTestBase.java           | 20 +------
 2 files changed, 40 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3fcc04ab/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java
index 072086b..2c2f2b4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java
@@ -22,30 +22,24 @@ import org.apache.flink.api.common.functions.RichFilterFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.apache.flink.util.Collector;
 
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
 
 import java.io.IOException;
-import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 /**
  * A simple test that runs a streaming topology with checkpointing enabled.
@@ -53,6 +47,10 @@ import static org.junit.Assert.fail;
  * The test triggers a failure after a while and verifies that, after completion, the
  * state defined with either the {@link OperatorState} or the {@link Checkpointed}
  * interface reflects the "exactly once" semantics.
+ * 
+ * The test throttles the input until at least two checkpoints are completed, to make sure that
+ * the recovery does not fall back to "square one" (which would naturally lead to correct
+ * results without testing the checkpointing).
  */
 @SuppressWarnings("serial")
 public class StateCheckpoinedITCase extends StreamFaultToleranceTestBase {
@@ -63,17 +61,24 @@ public class StateCheckpoinedITCase extends StreamFaultToleranceTestBase {
 	 * Runs the following program:
 	 *
 	 * <pre>
-	 *     [ (source)->(filter)->(map) ] -> [ (map) ] -> [ (groupBy/reduce)->(sink) ]
+	 *     [ (source)->(filter)] -> [ (map) -> (map) ] -> [ (groupBy/reduce)->(sink) ]
 	 * </pre>
 	 */
 	@Override
 	public void testProgram(StreamExecutionEnvironment env) {
 		assertTrue("Broken test setup", NUM_STRINGS % 40 == 0);
 
+		final long failurePosMin = (long) (0.4 * NUM_STRINGS / PARALLELISM);
+		final long failurePosMax = (long) (0.7 * NUM_STRINGS / PARALLELISM);
+
+		final long failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
+		
 		DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS));
 
 		stream
-				// -------------- first vertex, chained to the source ----------------
+				// first vertex, chained to the source
+				// this filter throttles the flow until at least one checkpoint
+				// is complete, to make sure this program does not run without 
 				.filter(new StringRichFilterFunction())
 
 						// -------------- seconds vertex - one-to-one connected ----------------
@@ -83,12 +88,16 @@ public class StateCheckpoinedITCase extends StreamFaultToleranceTestBase {
 
 						// -------------- third vertex - reducer and the sink ----------------
 				.partitionByHash("prefix")
-				.flatMap(new OnceFailingAggregator(NUM_STRINGS))
+				.flatMap(new OnceFailingAggregator(failurePos))
 				.addSink(new ValidatingSink());
 	}
 
 	@Override
 	public void postSubmit() {
+		
+		assertTrue("Test inconclusive: failure occurred before first checkpoint",
+				OnceFailingAggregator.wasCheckpointedBeforeFailure);
+		
 		long filterSum = 0;
 		for (long l : StringRichFilterFunction.counts) {
 			filterSum += l;
@@ -189,14 +198,15 @@ public class StateCheckpoinedITCase extends StreamFaultToleranceTestBase {
 		}
 	}
 
-	private static class StringRichFilterFunction extends RichFilterFunction<String> implements Checkpointed<Long> {
+	private static class StringRichFilterFunction extends RichFilterFunction<String> 
+			implements Checkpointed<Long> {
 
 		static final long[] counts = new long[PARALLELISM];
-
+		
 		private long count;
-
+		
 		@Override
-		public boolean filter(String value) {
+		public boolean filter(String value) throws Exception {
 			count++;
 			return value.length() < 100; // should be always true
 		}
@@ -271,35 +281,34 @@ public class StateCheckpoinedITCase extends StreamFaultToleranceTestBase {
 	}
 	
 	private static class OnceFailingAggregator extends RichFlatMapFunction<PrefixCount, PrefixCount> 
-		implements Checkpointed<HashMap<String, PrefixCount>> {
+		implements Checkpointed<HashMap<String, PrefixCount>>, CheckpointNotifier {
 
+		static boolean wasCheckpointedBeforeFailure = false;
+		
 		private static volatile boolean hasFailed = false;
 
 		private final HashMap<String, PrefixCount> aggregationMap = new HashMap<String, PrefixCount>();
 		
-		private final long numElements;
-		
 		private long failurePos;
 		private long count;
 		
+		private boolean wasCheckpointed;
+		
 
-		OnceFailingAggregator(long numElements) {
-			this.numElements = numElements;
+		OnceFailingAggregator(long failurePos) {
+			this.failurePos = failurePos;
 		}
 		
 		@Override
 		public void open(Configuration parameters) {
-			long failurePosMin = (long) (0.4 * numElements / getRuntimeContext().getNumberOfParallelSubtasks());
-			long failurePosMax = (long) (0.7 * numElements / getRuntimeContext().getNumberOfParallelSubtasks());
-
-			failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
 			count = 0;
 		}
 
 		@Override
 		public void flatMap(PrefixCount value, Collector<PrefixCount> out) throws Exception {
 			count++;
-			if (!hasFailed && count >= failurePos) {
+			if (!hasFailed && count >= failurePos && getRuntimeContext().getIndexOfThisSubtask() == 1) {
+				wasCheckpointedBeforeFailure = wasCheckpointed;
 				hasFailed = true;
 				throw new Exception("Test Failure");
 			}
@@ -324,6 +333,11 @@ public class StateCheckpoinedITCase extends StreamFaultToleranceTestBase {
 		public void restoreState(HashMap<String, PrefixCount> state) {
 			aggregationMap.putAll(state);
 		}
+
+		@Override
+		public void notifyCheckpointComplete(long checkpointId) {
+			this.wasCheckpointed = true;
+		}
 	}
 
 	private static class ValidatingSink extends RichSinkFunction<PrefixCount> 

http://git-wip-us.apache.org/repos/asf/flink/blob/3fcc04ab/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
index 2993315..8920cf2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
@@ -18,38 +18,22 @@
 
 package org.apache.flink.test.checkpointing;
 
-
-import org.apache.flink.api.common.functions.RichFilterFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.state.OperatorState;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
-import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
-import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.io.IOException;
 import java.io.Serializable;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.ConcurrentHashMap;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-
 /**
  * Test base for fault tolerant streaming programs
  */
-@SuppressWarnings("serial")
 public abstract class StreamFaultToleranceTestBase {
 
 	protected static final int NUM_TASK_MANAGERS = 2;
@@ -127,6 +111,7 @@ public abstract class StreamFaultToleranceTestBase {
 	//  Frequently used utilities
 	// --------------------------------------------------------------------------------------------
 
+	@SuppressWarnings("serial")
 	public static class PrefixCount implements Serializable {
 
 		public String prefix;
@@ -146,5 +131,4 @@ public abstract class StreamFaultToleranceTestBase {
 			return prefix + " / " + value;
 		}
 	}
-
 }