You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/20 16:49:14 UTC
[09/10] flink git commit: [hotfix] [tests] Speed up
CoStreamCheckpointingITCase
[hotfix] [tests] Speed up CoStreamCheckpointingITCase
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a26b0f08
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a26b0f08
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a26b0f08
Branch: refs/heads/master
Commit: a26b0f0826dd84cded650280f3b8262e628814da
Parents: 7280df4
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Dec 20 14:53:13 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Dec 20 17:01:44 2016 +0100
----------------------------------------------------------------------
.../CoStreamCheckpointingITCase.java | 172 +++++++++++--------
1 file changed, 104 insertions(+), 68 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a26b0f08/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
index c503a1f..51a00b9 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
@@ -21,17 +21,23 @@ package org.apache.flink.test.checkpointing;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
-import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.test.util.TestUtils;
import org.apache.flink.util.Collector;
+import org.junit.Test;
+
import java.io.IOException;
+import java.io.Serializable;
import java.util.Random;
import static org.junit.Assert.assertEquals;
@@ -50,10 +56,11 @@ import static org.junit.Assert.assertTrue;
* The test triggers a failure after a while and verifies that, after completion, the
* state reflects the "exactly once" semantics.
*/
-@SuppressWarnings("serial")
-public class CoStreamCheckpointingITCase extends StreamFaultToleranceTestBase {
+@SuppressWarnings({"serial", "deprecation"})
+public class CoStreamCheckpointingITCase extends StreamingMultipleProgramsTestBase {
- final long NUM_STRINGS = 10_000_000L;
+ private static final long NUM_STRINGS = 10_000L;
+ private static final int PARALLELISM = 4;
/**
* Runs the following program:
@@ -62,12 +69,16 @@ public class CoStreamCheckpointingITCase extends StreamFaultToleranceTestBase {
* [ (source)->(filter)->(map) ] -> [ (co-map) ] -> [ (map) ] -> [ (groupBy/reduce)->(sink) ]
* </pre>
*/
- @Override
- public void testProgram(StreamExecutionEnvironment env) {
-
+ @Test
+ public void testCoStreamCheckpointingProgram() throws Exception {
assertTrue("Broken test setup", NUM_STRINGS % 40 == 0);
- DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS));
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(PARALLELISM);
+ env.enableCheckpointing(50);
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 0L));
+
+ DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS, NUM_STRINGS / 5));
stream
// -------------- first vertex, chained to the source ----------------
@@ -91,10 +102,11 @@ public class CoStreamCheckpointingITCase extends StreamFaultToleranceTestBase {
// Do nothing here
}
});
- }
- @Override
- public void postSubmit() {
+ TestUtils.tryExecute(env, "Fault Tolerance Test");
+
+ // validate the result
+
long filterSum = 0;
for (long l : StringRichFilterFunction.counts) {
filterSum += l;
@@ -115,14 +127,6 @@ public class CoStreamCheckpointingITCase extends StreamFaultToleranceTestBase {
countSum += l;
}
- if (!StringPrefixCountRichMapFunction.restoreCalledAtLeastOnce) {
- System.err.println("Test inconclusive: Restore was never called on counting Map function.");
- }
-
- if (!LeftIdentityCoRichFlatMapFunction.restoreCalledAtLeastOnce) {
- System.err.println("Test inconclusive: Restore was never called on counting CoMap function.");
- }
-
// verify that we counted exactly right
assertEquals(NUM_STRINGS, filterSum);
assertEquals(NUM_STRINGS, coMapSum);
@@ -135,41 +139,42 @@ public class CoStreamCheckpointingITCase extends StreamFaultToleranceTestBase {
// Custom Functions
// --------------------------------------------------------------------------------------------
- private static class StringGeneratingSourceFunction extends RichSourceFunction<String>
- implements ParallelSourceFunction<String>, Checkpointed<Integer> {
+ /**
+ * A generating source that is slow before the first two checkpoints went through
+ * and will indefinitely stall at a certain point to allow the checkpoint to complete.
+ *
+ * After the checkpoints are through, it continues with full speed.
+ */
+ private static class StringGeneratingSourceFunction extends RichParallelSourceFunction<String>
+ implements Checkpointed<Integer>, CheckpointListener {
- static final long[] counts = new long[PARALLELISM];
-
- private final long numElements;
+ private static volatile int numCompletedCheckpoints = 0;
- private Random rnd;
- private StringBuilder stringBuilder;
+ private final long numElements;
+ private final long checkpointLatestAt;
- private int index;
- private int step;
+ private int index = -1;
private volatile boolean isRunning = true;
-
- StringGeneratingSourceFunction(long numElements) {
+ StringGeneratingSourceFunction(long numElements, long checkpointLatestAt) {
this.numElements = numElements;
- }
-
- @Override
- public void open(Configuration parameters) throws IOException {
- rnd = new Random();
- stringBuilder = new StringBuilder();
-
- step = getRuntimeContext().getNumberOfParallelSubtasks();
- if (index == 0) {
- index = getRuntimeContext().getIndexOfThisSubtask();
- }
+ this.checkpointLatestAt = checkpointLatestAt;
}
@Override
public void run(SourceContext<String> ctx) throws Exception {
+ final Random rnd = new Random();
+ final StringBuilder stringBuilder = new StringBuilder();
+
final Object lockingObject = ctx.getCheckpointLock();
+ final int step = getRuntimeContext().getNumberOfParallelSubtasks();
+ if (index < 0) {
+ // not been restored, so initialize
+ index =getRuntimeContext().getIndexOfThisSubtask();
+ }
+
while (isRunning && index < numElements) {
char first = (char) ((index % 40) + 40);
@@ -178,23 +183,49 @@ public class CoStreamCheckpointingITCase extends StreamFaultToleranceTestBase {
String result = randomString(stringBuilder, rnd);
+ //noinspection SynchronizationOnLocalVariableOrMethodParameter
synchronized (lockingObject) {
index += step;
ctx.collect(result);
}
+
+ if (numCompletedCheckpoints < 2) {
+ // not yet completed enough checkpoints, so slow down
+ if (index < checkpointLatestAt) {
+ // mild slow down
+ Thread.sleep(1);
+ } else {
+ // wait until the checkpoints are completed
+ while (isRunning && numCompletedCheckpoints < 2) {
+ Thread.sleep(5);
+ }
+ }
+ }
}
}
@Override
- public void close() throws IOException {
- counts[getRuntimeContext().getIndexOfThisSubtask()] = index;
- }
-
- @Override
public void cancel() {
isRunning = false;
}
+ @Override
+ public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
+ return index;
+ }
+
+ @Override
+ public void restoreState(Integer state) {
+ index = state;
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ if (getRuntimeContext().getIndexOfThisSubtask() == 0) {
+ numCompletedCheckpoints++;
+ }
+ }
+
private static String randomString(StringBuilder bld, Random rnd) {
final int len = rnd.nextInt(10) + 5;
@@ -205,16 +236,6 @@ public class CoStreamCheckpointingITCase extends StreamFaultToleranceTestBase {
return bld.toString();
}
-
- @Override
- public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
- return index;
- }
-
- @Override
- public void restoreState(Integer state) {
- index = state;
- }
}
private static class StatefulCounterFunction extends RichMapFunction<PrefixCount, PrefixCount>
@@ -284,9 +305,10 @@ public class CoStreamCheckpointingITCase extends StreamFaultToleranceTestBase {
private static class StringRichFilterFunction extends RichFilterFunction<String> implements Checkpointed<Long> {
- Long count = 0L;
static final long[] counts = new long[PARALLELISM];
+ private long count = 0L;
+
@Override
public boolean filter(String value) {
count++;
@@ -310,10 +332,9 @@ public class CoStreamCheckpointingITCase extends StreamFaultToleranceTestBase {
}
private static class StringPrefixCountRichMapFunction extends RichMapFunction<String, PrefixCount> implements Checkpointed<Long> {
-
+
static final long[] counts = new long[PARALLELISM];
- static volatile boolean restoreCalledAtLeastOnce = false;
-
+
private long count;
@Override
@@ -329,11 +350,7 @@ public class CoStreamCheckpointingITCase extends StreamFaultToleranceTestBase {
@Override
public void restoreState(Long state) {
- restoreCalledAtLeastOnce = true;
count = state;
- if (count == 0) {
- throw new RuntimeException("Restore from beginning");
- }
}
@Override
@@ -345,8 +362,7 @@ public class CoStreamCheckpointingITCase extends StreamFaultToleranceTestBase {
private static class LeftIdentityCoRichFlatMapFunction extends RichCoFlatMapFunction<String, String, String> implements Checkpointed<Long> {
static final long[] counts = new long[PARALLELISM];
- static volatile boolean restoreCalledAtLeastOnce = false;
-
+
private long count;
@Override
@@ -368,7 +384,6 @@ public class CoStreamCheckpointingITCase extends StreamFaultToleranceTestBase {
@Override
public void restoreState(Long state) {
- restoreCalledAtLeastOnce = true;
count = state;
}
@@ -377,4 +392,25 @@ public class CoStreamCheckpointingITCase extends StreamFaultToleranceTestBase {
counts[getRuntimeContext().getIndexOfThisSubtask()] = count;
}
}
+
+ public static class PrefixCount implements Serializable {
+
+ public String prefix;
+ public String value;
+ public long count;
+
+ @SuppressWarnings("unused")
+ public PrefixCount() {}
+
+ public PrefixCount(String prefix, String value, long count) {
+ this.prefix = prefix;
+ this.value = value;
+ this.count = count;
+ }
+
+ @Override
+ public String toString() {
+ return prefix + " / " + value;
+ }
+ }
}