You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/05/21 13:50:40 UTC
[2/4] flink git commit: [FLINK-2063] [streaming] Configure checkpoint
coordinator to treat all vertices as stateful.
[FLINK-2063] [streaming] Configure checkpoint coordinator to treat all vertices as stateful.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/85453b64
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/85453b64
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/85453b64
Branch: refs/heads/master
Commit: 85453b64185914d5889b194a5e9614cf1da9e9fe
Parents: 68f41a0
Author: Stephan Ewen <se...@apache.org>
Authored: Wed May 20 16:40:12 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed May 20 20:16:51 2015 +0200
----------------------------------------------------------------------
.../api/graph/StreamingJobGraphGenerator.java | 34 ++---
.../StreamCheckpointingITCase.java | 149 ++++++++++++-------
2 files changed, 107 insertions(+), 76 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/85453b64/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index ef5ffca..6bad4c8 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -364,34 +364,26 @@ public class StreamingJobGraphGenerator {
throw new IllegalArgumentException("The checkpoint interval must be positive");
}
- // gather source and sink IDs
- HashSet<JobVertexID> sourceIds = new HashSet<JobVertexID>();
- HashSet<JobVertexID> sinkIds = new HashSet<JobVertexID>();
- for (AbstractJobVertex vertex : jobVertices.values()) {
- if (vertex.isInputVertex()) {
- sourceIds.add(vertex.getID());
- }
- if (vertex.isOutputVertex()) {
- sinkIds.add(vertex.getID());
- }
- }
-
- HashSet<JobVertexID> sourceorSink = new HashSet<JobVertexID>();
- sourceorSink.addAll(sourceIds);
- sourceorSink.addAll(sinkIds);
-
// collect the vertices that receive "trigger checkpoint" messages.
// currently, these are all the sources
- List<JobVertexID> triggerVertices = new ArrayList<JobVertexID>(sourceIds);
+ List<JobVertexID> triggerVertices = new ArrayList<JobVertexID>();
// collect the vertices that need to acknowledge the checkpoint
- // currently, these are the sources and sinks
- // the sources acknowledge their state backup, the sinks the arrival of the barriers
- List<JobVertexID> ackVertices = new ArrayList<JobVertexID>(sourceorSink);
+ // currently, these are all vertices
+ List<JobVertexID> ackVertices = new ArrayList<JobVertexID>(jobVertices.size());
// collect the vertices that receive "commit checkpoint" messages
// currently, these are only the sources
- List<JobVertexID> commitVertices = new ArrayList<JobVertexID>(sourceIds);
+ List<JobVertexID> commitVertices = new ArrayList<JobVertexID>();
+
+
+ for (AbstractJobVertex vertex : jobVertices.values()) {
+ if (vertex.isInputVertex()) {
+ triggerVertices.add(vertex.getID());
+ commitVertices.add(vertex.getID());
+ }
+ ackVertices.add(vertex.getID());
+ }
JobSnapshottingSettings settings = new JobSnapshottingSettings(
triggerVertices, ackVertices, commitVertices, interval);
http://git-wip-us.apache.org/repos/asf/flink/blob/85453b64/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
index 124a1fd..d88f3fa 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
@@ -18,9 +18,15 @@
package org.apache.flink.test.checkpointing;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -28,20 +34,21 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Random;
-
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/**
* A simple test that runs a streaming topology with checkpointing enabled.
+ *
+ * The test triggers a failure after a while and verifies that, after completion, the
+ * state reflects the "exactly once" semantics.
*/
@SuppressWarnings("serial")
public class StreamCheckpointingITCase {
@@ -84,9 +91,7 @@ public class StreamCheckpointingITCase {
* Runs the following program:
*
* <pre>
- *
- * (source) -> (filter) -> (map) -> (groupBy / reduce) -> (sink)
- *
+ * [ (source)->(filter)->(map) ] -> [ (map) ] -> [ (groupBy/reduce)->(sink) ]
* </pre>
*/
@Test
@@ -94,50 +99,20 @@ public class StreamCheckpointingITCase {
assertTrue("Broken test setup", NUM_STRINGS % 40 == 0);
+ final String COUNT_ACCUMULATOR = "count-acc";
+
try {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
"localhost", cluster.getJobManagerRPCPort());
env.setParallelism(PARALLELISM);
- env.enableCheckpointing(1000);
+ env.enableCheckpointing(500);
env.getConfig().disableSysoutLogging();
- DataStream<String> stream = env.addSource(new RichParallelSourceFunction<String>() {
-
- private Random rnd;
- private StringBuilder stringBuilder;
-
- private int index;
- private int step;
- private boolean running = true;
-
- @Override
- public void open(Configuration parameters) {
- rnd = new Random();
- stringBuilder = new StringBuilder();
- step = getRuntimeContext().getNumberOfParallelSubtasks();
- index = getRuntimeContext().getIndexOfThisSubtask();
- }
-
- @Override
- public boolean reachedEnd() throws Exception {
- return index >= NUM_STRINGS;
- }
-
- @Override
- public String next() throws Exception {
- char first = (char) ((index % 40) + 40);
-
- stringBuilder.setLength(0);
- stringBuilder.append(first);
-
- String result = randomString(stringBuilder, rnd);
- index += step;
- return result;
- }
-
- });
-
+ DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction());
+
stream
+ // -------------- first vertex, chained to the source ----------------
+
.filter(new FilterFunction<String>() {
@Override
public boolean filter(String value) {
@@ -151,6 +126,28 @@ public class StreamCheckpointingITCase {
return new PrefixCount(value.substring(0, 1), value, 1L);
}
})
+
+ // -------------- seconds vertex - the stateful one ----------------
+
+ .startNewChain()
+ .map(new RichMapFunction<PrefixCount, PrefixCount>() {
+
+ private long count = 0;
+
+ @Override
+ public PrefixCount map(PrefixCount value) {
+ count++;
+ return value;
+ }
+
+ @Override
+ public void close() {
+ getRuntimeContext().getLongCounter(COUNT_ACCUMULATOR).add(count);
+ }
+ })
+
+ // -------------- third vertex - the sink ----------------
+
.groupBy("prefix")
.reduce(new ReduceFunction<PrefixCount>() {
@Override
@@ -180,28 +177,70 @@ public class StreamCheckpointingITCase {
assertEquals(NUM_STRINGS / 40, count.longValue());
}
}
-
- });
-
- env.execute();
+ });
+
+ JobExecutionResult result = env.execute();
+
+ Long totalCount = (Long) result.getAllAccumulatorResults().get(COUNT_ACCUMULATOR);
+
+ assertNotNull("TotalCount accumulator not set", totalCount);
+ assertEquals(NUM_STRINGS, totalCount.longValue());
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
+
+ // --------------------------------------------------------------------------------------------
+ // Custom Functions
+ // --------------------------------------------------------------------------------------------
+
+ private static class StringGeneratingSourceFunction extends RichParallelSourceFunction<String> {
- private static String randomString(StringBuilder bld, Random rnd) {
- final int len = rnd.nextInt(10) + 5;
+ private Random rnd;
+ private StringBuilder stringBuilder;
- for (int i = 0; i < len; i++) {
- char next = (char) (rnd.nextInt(20000) + 33);
- bld.append(next);
+ private int index;
+ private int step;
+
+ @Override
+ public void open(Configuration parameters) {
+ rnd = new Random();
+ stringBuilder = new StringBuilder();
+ step = getRuntimeContext().getNumberOfParallelSubtasks();
+ index = getRuntimeContext().getIndexOfThisSubtask();
}
- return bld.toString();
- }
+ @Override
+ public boolean reachedEnd() throws Exception {
+ return index >= NUM_STRINGS;
+ }
+
+ @Override
+ public String next() throws Exception {
+ char first = (char) ((index % 40) + 40);
+
+ stringBuilder.setLength(0);
+ stringBuilder.append(first);
+ String result = randomString(stringBuilder, rnd);
+ index += step;
+ return result;
+ }
+
+ private static String randomString(StringBuilder bld, Random rnd) {
+ final int len = rnd.nextInt(10) + 5;
+
+ for (int i = 0; i < len; i++) {
+ char next = (char) (rnd.nextInt(20000) + 33);
+ bld.append(next);
+ }
+
+ return bld.toString();
+ }
+ }
+
// --------------------------------------------------------------------------------------------
// Custom Type Classes
// --------------------------------------------------------------------------------------------