You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/05/21 13:50:40 UTC

[2/4] flink git commit: [FLINK-2063] [streaming] Configure checkpoint coordinator to treat all vertices as stateful.

[FLINK-2063] [streaming] Configure checkpoint coordinator to treat all vertices as stateful.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/85453b64
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/85453b64
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/85453b64

Branch: refs/heads/master
Commit: 85453b64185914d5889b194a5e9614cf1da9e9fe
Parents: 68f41a0
Author: Stephan Ewen <se...@apache.org>
Authored: Wed May 20 16:40:12 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed May 20 20:16:51 2015 +0200

----------------------------------------------------------------------
 .../api/graph/StreamingJobGraphGenerator.java   |  34 ++---
 .../StreamCheckpointingITCase.java              | 149 ++++++++++++-------
 2 files changed, 107 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/85453b64/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index ef5ffca..6bad4c8 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -364,34 +364,26 @@ public class StreamingJobGraphGenerator {
 				throw new IllegalArgumentException("The checkpoint interval must be positive");
 			}
 
-			// gather source and sink IDs
-			HashSet<JobVertexID> sourceIds = new HashSet<JobVertexID>();
-			HashSet<JobVertexID> sinkIds = new HashSet<JobVertexID>();
-			for (AbstractJobVertex vertex : jobVertices.values()) {
-				if (vertex.isInputVertex()) {
-					sourceIds.add(vertex.getID());
-				}
-				if (vertex.isOutputVertex()) {
-					sinkIds.add(vertex.getID());
-				}
-			}
-
-			HashSet<JobVertexID> sourceorSink = new HashSet<JobVertexID>();
-			sourceorSink.addAll(sourceIds);
-			sourceorSink.addAll(sinkIds);
-			
 			// collect the vertices that receive "trigger checkpoint" messages.
 			// currently, these are all the sources
-			List<JobVertexID> triggerVertices = new ArrayList<JobVertexID>(sourceIds);
+			List<JobVertexID> triggerVertices = new ArrayList<JobVertexID>();
 
 			// collect the vertices that need to acknowledge the checkpoint
-			// currently, these are the sources and sinks
-			// the sources acknowledge their state backup, the sinks the arrival of the barriers
-			List<JobVertexID> ackVertices = new ArrayList<JobVertexID>(sourceorSink);
+			// currently, these are all vertices
+			List<JobVertexID> ackVertices = new ArrayList<JobVertexID>(jobVertices.size());
 
 			// collect the vertices that receive "commit checkpoint" messages
 			// currently, these are only the sources
-			List<JobVertexID> commitVertices = new ArrayList<JobVertexID>(sourceIds);
+			List<JobVertexID> commitVertices = new ArrayList<JobVertexID>();
+			
+			
+			for (AbstractJobVertex vertex : jobVertices.values()) {
+				if (vertex.isInputVertex()) {
+					triggerVertices.add(vertex.getID());
+					commitVertices.add(vertex.getID());
+				}
+				ackVertices.add(vertex.getID());
+			}
 
 			JobSnapshottingSettings settings = new JobSnapshottingSettings(
 					triggerVertices, ackVertices, commitVertices, interval);

http://git-wip-us.apache.org/repos/asf/flink/blob/85453b64/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
index 124a1fd..d88f3fa 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
@@ -18,9 +18,15 @@
 
 package org.apache.flink.test.checkpointing;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -28,20 +34,21 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Random;
-
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 /**
  * A simple test that runs a streaming topology with checkpointing enabled.
+ * 
+ * The test triggers a failure after a while and verifies that, after completion, the
+ * state reflects the "exactly once" semantics.
  */
 @SuppressWarnings("serial")
 public class StreamCheckpointingITCase {
@@ -84,9 +91,7 @@ public class StreamCheckpointingITCase {
 	 * Runs the following program:
 	 *
 	 * <pre>
-	 *
-	 *     (source)  ->  (filter)  ->  (map)  ->  (groupBy / reduce)  -> (sink)
-	 *
+	 *     [ (source)->(filter)->(map) ] -> [ (map) ] -> [ (groupBy/reduce)->(sink) ]
 	 * </pre>
 	 */
 	@Test
@@ -94,50 +99,20 @@ public class StreamCheckpointingITCase {
 
 		assertTrue("Broken test setup", NUM_STRINGS % 40 == 0);
 
+		final String COUNT_ACCUMULATOR = "count-acc";
+		
 		try {
 			StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
 																	"localhost", cluster.getJobManagerRPCPort());
 			env.setParallelism(PARALLELISM);
-			env.enableCheckpointing(1000);
+			env.enableCheckpointing(500);
 			env.getConfig().disableSysoutLogging();
 
-			DataStream<String> stream = env.addSource(new RichParallelSourceFunction<String>() {
-
-				private Random rnd;
-				private StringBuilder stringBuilder;
-
-				private int index;
-				private int step;
-				private boolean running = true;
-
-				@Override
-				public void open(Configuration parameters) {
-					rnd = new Random();
-					stringBuilder = new StringBuilder();
-					step = getRuntimeContext().getNumberOfParallelSubtasks();
-					index = getRuntimeContext().getIndexOfThisSubtask();
-				}
-
-				@Override
-				public boolean reachedEnd() throws Exception {
-					return index >= NUM_STRINGS;
-				}
-
-				@Override
-				public String next() throws Exception {
-					char first = (char) ((index % 40) + 40);
-
-					stringBuilder.setLength(0);
-					stringBuilder.append(first);
-
-					String result = randomString(stringBuilder, rnd);
-					index += step;
-					return result;
-				}
-
-			});
-
+			DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction());
+			
 			stream
+					// -------------- first vertex, chained to the source ----------------
+					
 					.filter(new FilterFunction<String>() {
 						@Override
 						public boolean filter(String value) {
@@ -151,6 +126,28 @@ public class StreamCheckpointingITCase {
 							return new PrefixCount(value.substring(0, 1), value, 1L);
 						}
 					})
+
+					// -------------- seconds vertex - the stateful one ----------------
+					
+					.startNewChain()
+					.map(new RichMapFunction<PrefixCount, PrefixCount>() {
+						
+						private long count = 0;
+						
+						@Override
+						public PrefixCount map(PrefixCount value) {
+							count++;
+							return value;
+						}
+
+						@Override
+						public void close() {
+							getRuntimeContext().getLongCounter(COUNT_ACCUMULATOR).add(count);
+						}
+					})
+
+					// -------------- third vertex - the sink ----------------
+					
 					.groupBy("prefix")
 					.reduce(new ReduceFunction<PrefixCount>() {
 						@Override
@@ -180,28 +177,70 @@ public class StreamCheckpointingITCase {
 								assertEquals(NUM_STRINGS / 40, count.longValue());
 							}
 						}
-
-			});
-
-			env.execute();
+					});
+
+			JobExecutionResult result = env.execute();
+			
+			Long totalCount = (Long) result.getAllAccumulatorResults().get(COUNT_ACCUMULATOR);
+			
+			assertNotNull("TotalCount accumulator not set", totalCount);
+			assertEquals(NUM_STRINGS, totalCount.longValue());
 		}
 		catch (Exception e) {
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
 	}
+	
+	// --------------------------------------------------------------------------------------------
+	//  Custom Functions
+	// --------------------------------------------------------------------------------------------
+	
+	private static class StringGeneratingSourceFunction extends RichParallelSourceFunction<String> {
 
-	private static String randomString(StringBuilder bld, Random rnd) {
-		final int len = rnd.nextInt(10) + 5;
+		private Random rnd;
+		private StringBuilder stringBuilder;
 
-		for (int i = 0; i < len; i++) {
-			char next = (char) (rnd.nextInt(20000) + 33);
-			bld.append(next);
+		private int index;
+		private int step;
+
+		@Override
+		public void open(Configuration parameters) {
+			rnd = new Random();
+			stringBuilder = new StringBuilder();
+			step = getRuntimeContext().getNumberOfParallelSubtasks();
+			index = getRuntimeContext().getIndexOfThisSubtask();
 		}
 
-		return bld.toString();
-	}
+		@Override
+		public boolean reachedEnd() throws Exception {
+			return index >= NUM_STRINGS;
+		}
+
+		@Override
+		public String next() throws Exception {
+			char first = (char) ((index % 40) + 40);
+
+			stringBuilder.setLength(0);
+			stringBuilder.append(first);
 
+			String result = randomString(stringBuilder, rnd);
+			index += step;
+			return result;
+		}
+
+		private static String randomString(StringBuilder bld, Random rnd) {
+			final int len = rnd.nextInt(10) + 5;
+
+			for (int i = 0; i < len; i++) {
+				char next = (char) (rnd.nextInt(20000) + 33);
+				bld.append(next);
+			}
+
+			return bld.toString();
+		}
+	}
+	
 	// --------------------------------------------------------------------------------------------
 	//  Custom Type Classes
 	// --------------------------------------------------------------------------------------------