You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/20 16:49:14 UTC

[09/10] flink git commit: [hotfix] [tests] Speed up CoStreamCheckpointingITCase

[hotfix] [tests] Speed up CoStreamCheckpointingITCase


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a26b0f08
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a26b0f08
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a26b0f08

Branch: refs/heads/master
Commit: a26b0f0826dd84cded650280f3b8262e628814da
Parents: 7280df4
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Dec 20 14:53:13 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Dec 20 17:01:44 2016 +0100

----------------------------------------------------------------------
 .../CoStreamCheckpointingITCase.java            | 172 +++++++++++--------
 1 file changed, 104 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a26b0f08/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
index c503a1f..51a00b9 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
@@ -21,17 +21,23 @@ package org.apache.flink.test.checkpointing;
 import org.apache.flink.api.common.functions.RichFilterFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
-import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.test.util.TestUtils;
 import org.apache.flink.util.Collector;
 
+import org.junit.Test;
+
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.Random;
 
 import static org.junit.Assert.assertEquals;
@@ -50,10 +56,11 @@ import static org.junit.Assert.assertTrue;
  * The test triggers a failure after a while and verifies that, after completion, the
  * state reflects the "exactly once" semantics.
  */
-@SuppressWarnings("serial")
-public class CoStreamCheckpointingITCase extends StreamFaultToleranceTestBase {
+@SuppressWarnings({"serial", "deprecation"})
+public class CoStreamCheckpointingITCase extends StreamingMultipleProgramsTestBase {
 
-	final long NUM_STRINGS = 10_000_000L;
+	private static final long NUM_STRINGS = 10_000L;
+	private static final int PARALLELISM = 4;
 
 	/**
 	 * Runs the following program:
@@ -62,12 +69,16 @@ public class CoStreamCheckpointingITCase extends StreamFaultToleranceTestBase {
 	 *     [ (source)->(filter)->(map) ] -> [ (co-map) ] -> [ (map) ] -> [ (groupBy/reduce)->(sink) ]
 	 * </pre>
 	 */
-	@Override
-	public void testProgram(StreamExecutionEnvironment env) {
-
+	@Test
+	public void testCoStreamCheckpointingProgram() throws Exception {
 		assertTrue("Broken test setup", NUM_STRINGS % 40 == 0);
 
-		DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS));
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(PARALLELISM);
+		env.enableCheckpointing(50);
+		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 0L));
+
+		DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS, NUM_STRINGS / 5));
 
 		stream
 				// -------------- first vertex, chained to the source ----------------
@@ -91,10 +102,11 @@ public class CoStreamCheckpointingITCase extends StreamFaultToleranceTestBase {
 						// Do nothing here
 					}
 				});
-	}
 
-	@Override
-	public void postSubmit() {
+		TestUtils.tryExecute(env, "Fault Tolerance Test");
+
+		// validate the result
+
 		long filterSum = 0;
 		for (long l : StringRichFilterFunction.counts) {
 			filterSum += l;
@@ -115,14 +127,6 @@ public class CoStreamCheckpointingITCase extends StreamFaultToleranceTestBase {
 			countSum += l;
 		}
 
-		if (!StringPrefixCountRichMapFunction.restoreCalledAtLeastOnce) {
-			System.err.println("Test inconclusive: Restore was never called on counting Map function.");
-		}
-
-		if (!LeftIdentityCoRichFlatMapFunction.restoreCalledAtLeastOnce) {
-			System.err.println("Test inconclusive: Restore was never called on counting CoMap function.");
-		}
-
 		// verify that we counted exactly right
 		assertEquals(NUM_STRINGS, filterSum);
 		assertEquals(NUM_STRINGS, coMapSum);
@@ -135,41 +139,42 @@ public class CoStreamCheckpointingITCase extends StreamFaultToleranceTestBase {
 	//  Custom Functions
 	// --------------------------------------------------------------------------------------------
 
-	private static class StringGeneratingSourceFunction extends RichSourceFunction<String>
-			implements ParallelSourceFunction<String>, Checkpointed<Integer> {
+	/**
+	 * A generating source that is slow before the first two checkpoints went through
+	 * and will indefinitely stall at a certain point to allow the checkpoint to complete.
+	 * 
+	 * After the checkpoints are through, it continues with full speed.
+	 */
+	private static class StringGeneratingSourceFunction extends RichParallelSourceFunction<String>
+			implements Checkpointed<Integer>, CheckpointListener {
 
-		static final long[] counts = new long[PARALLELISM];
-		
-		private final long numElements;
+		private static volatile int numCompletedCheckpoints = 0;
 
-		private Random rnd;
-		private StringBuilder stringBuilder;
+		private final long numElements;
+		private final long checkpointLatestAt;
 
-		private int index;
-		private int step;
+		private int index = -1;
 
 		private volatile boolean isRunning = true;
 
-		
-		StringGeneratingSourceFunction(long numElements) {
+		StringGeneratingSourceFunction(long numElements, long checkpointLatestAt) {
 			this.numElements = numElements;
-		}
-
-		@Override
-		public void open(Configuration parameters) throws IOException {
-			rnd = new Random();
-			stringBuilder = new StringBuilder();
-			
-			step = getRuntimeContext().getNumberOfParallelSubtasks();
-			if (index == 0) {
-				index = getRuntimeContext().getIndexOfThisSubtask();
-			}
+			this.checkpointLatestAt = checkpointLatestAt;
 		}
 
 		@Override
 		public void run(SourceContext<String> ctx) throws Exception {
+			final Random rnd = new Random();
+			final StringBuilder stringBuilder = new StringBuilder();
+
 			final Object lockingObject = ctx.getCheckpointLock();
 
+			final int step = getRuntimeContext().getNumberOfParallelSubtasks();
+			if (index < 0) {
+				// not been restored, so initialize
+				index =getRuntimeContext().getIndexOfThisSubtask();
+			}
+
 			while (isRunning && index < numElements) {
 				char first = (char) ((index % 40) + 40);
 
@@ -178,23 +183,49 @@ public class CoStreamCheckpointingITCase extends StreamFaultToleranceTestBase {
 
 				String result = randomString(stringBuilder, rnd);
 
+				//noinspection SynchronizationOnLocalVariableOrMethodParameter
 				synchronized (lockingObject) {
 					index += step;
 					ctx.collect(result);
 				}
+
+				if (numCompletedCheckpoints < 2) {
+					// not yet completed enough checkpoints, so slow down
+					if (index < checkpointLatestAt) {
+						// mild slow down
+						Thread.sleep(1);
+					} else {
+						// wait until the checkpoints are completed
+						while (isRunning && numCompletedCheckpoints < 2) {
+							Thread.sleep(5);
+						}
+					}
+				}
 			}
 		}
 
 		@Override
-		public void close() throws IOException {
-			counts[getRuntimeContext().getIndexOfThisSubtask()] = index;
-		}
-		
-		@Override
 		public void cancel() {
 			isRunning = false;
 		}
 
+		@Override
+		public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
+			return index;
+		}
+
+		@Override
+		public void restoreState(Integer state) {
+			index = state;
+		}
+
+		@Override
+		public void notifyCheckpointComplete(long checkpointId) throws Exception {
+			if (getRuntimeContext().getIndexOfThisSubtask() == 0) {
+				numCompletedCheckpoints++;
+			}
+		}
+
 		private static String randomString(StringBuilder bld, Random rnd) {
 			final int len = rnd.nextInt(10) + 5;
 
@@ -205,16 +236,6 @@ public class CoStreamCheckpointingITCase extends StreamFaultToleranceTestBase {
 
 			return bld.toString();
 		}
-
-		@Override
-		public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
-			return index;
-		}
-
-		@Override
-		public void restoreState(Integer state) {
-			index = state;
-		}
 	}
 
 	private static class StatefulCounterFunction extends RichMapFunction<PrefixCount, PrefixCount> 
@@ -284,9 +305,10 @@ public class CoStreamCheckpointingITCase extends StreamFaultToleranceTestBase {
 
 	private static class StringRichFilterFunction extends RichFilterFunction<String> implements Checkpointed<Long> {
 
-		Long count = 0L;
 		static final long[] counts = new long[PARALLELISM];
 
+		private long count = 0L;
+
 		@Override
 		public boolean filter(String value) {
 			count++;
@@ -310,10 +332,9 @@ public class CoStreamCheckpointingITCase extends StreamFaultToleranceTestBase {
 	}
 
 	private static class StringPrefixCountRichMapFunction extends RichMapFunction<String, PrefixCount> implements Checkpointed<Long> {
-		
+
 		static final long[] counts = new long[PARALLELISM];
-		static volatile boolean restoreCalledAtLeastOnce = false;
-		
+
 		private long count;
 
 		@Override
@@ -329,11 +350,7 @@ public class CoStreamCheckpointingITCase extends StreamFaultToleranceTestBase {
 
 		@Override
 		public void restoreState(Long state) {
-			restoreCalledAtLeastOnce = true;
 			count = state;
-			if (count == 0) {
-				throw new RuntimeException("Restore from beginning");
-			}
 		}
 
 		@Override
@@ -345,8 +362,7 @@ public class CoStreamCheckpointingITCase extends StreamFaultToleranceTestBase {
 	private static class LeftIdentityCoRichFlatMapFunction extends RichCoFlatMapFunction<String, String, String> implements Checkpointed<Long> {
 
 		static final long[] counts = new long[PARALLELISM];
-		static volatile boolean restoreCalledAtLeastOnce = false;
-		
+
 		private long count;
 
 		@Override
@@ -368,7 +384,6 @@ public class CoStreamCheckpointingITCase extends StreamFaultToleranceTestBase {
 
 		@Override
 		public void restoreState(Long state) {
-			restoreCalledAtLeastOnce = true;
 			count = state;
 		}
 
@@ -377,4 +392,25 @@ public class CoStreamCheckpointingITCase extends StreamFaultToleranceTestBase {
 			counts[getRuntimeContext().getIndexOfThisSubtask()] = count;
 		}
 	}
+
+	public static class PrefixCount implements Serializable {
+
+		public String prefix;
+		public String value;
+		public long count;
+
+		@SuppressWarnings("unused")
+		public PrefixCount() {}
+
+		public PrefixCount(String prefix, String value, long count) {
+			this.prefix = prefix;
+			this.value = value;
+			this.count = count;
+		}
+
+		@Override
+		public String toString() {
+			return prefix + " / " + value;
+		}
+	}
 }