You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2015/04/07 18:32:46 UTC

[4/8] flink git commit: [FLINK-1837] [streaming] Throw Exception for checkpointed iterative programs

[FLINK-1837] [streaming] Throw Exception for checkpointed iterative programs

Checkpointing currently does not support this special case


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/52ebb295
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/52ebb295
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/52ebb295

Branch: refs/heads/release-0.9.0-milestone-1
Commit: 52ebb295c6782e5cc9c7747656c278849ec9030a
Parents: 954beca
Author: mbalassi <mb...@apache.org>
Authored: Tue Apr 7 17:04:39 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Tue Apr 7 17:04:39 2015 +0200

----------------------------------------------------------------------
 .../apache/flink/streaming/api/StreamGraph.java |  7 ++++
 .../apache/flink/streaming/api/IterateTest.java | 37 ++++++++++++++++----
 2 files changed, 38 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/52ebb295/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
index 351dec9..aa71804 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
@@ -481,6 +481,11 @@ public class StreamGraph extends StreamingPlan {
 	 */
 	public JobGraph getJobGraph(String jobGraphName) {
 
+		// temporarily forbid checkpointing for iterative jobs
+		if (isIterative() && isCheckpointingEnabled()){
+			throw new UnsupportedOperationException("Checkpointing is currently not supported for iterative jobs!");
+		}
+
 		this.jobName = jobGraphName;
 
 		WindowingOptimizer.optimizeGraph(this);
@@ -558,6 +563,8 @@ public class StreamGraph extends StreamingPlan {
 		return iterationTimeouts.get(vertexID);
 	}
 
+	public boolean isIterative() { return !iterationIds.isEmpty(); }
+
 	public String getOperatorName(Integer vertexID) {
 		return operatorNames.get(vertexID);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/52ebb295/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
index a64c4b1..31bd147 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
@@ -29,11 +29,13 @@ import org.junit.Test;
 import java.util.Collections;
 
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class IterateTest {
 
 	private static final long MEMORYSIZE = 32;
 	private static boolean iterated[];
+	private static int PARALLELISM = 2;
 
 	public static final class IterationHead extends RichFlatMapFunction<Boolean,Boolean> {
 
@@ -73,14 +75,10 @@ public class IterateTest {
 		}
 	}
 
-	@Test
-	public void test() throws Exception {
-		int parallelism = 2;
-		StreamExecutionEnvironment env = new TestStreamEnvironment(parallelism, MEMORYSIZE);
-		iterated = new boolean[parallelism];
+	public StreamExecutionEnvironment constructIterativeJob(StreamExecutionEnvironment env){
 		env.setBufferTimeout(10);
 
-		DataStream<Boolean> source = env.fromCollection(Collections.nCopies(parallelism, false));
+		DataStream<Boolean> source = env.fromCollection(Collections.nCopies(PARALLELISM, false));
 
 		IterativeDataStream<Boolean> iteration = source.iterate(3000);
 
@@ -88,6 +86,15 @@ public class IterateTest {
 				new IterationTail());
 
 		iteration.closeWith(increment).addSink(new MySink());
+		return env;
+	}
+
+	@Test
+	public void test() throws Exception {
+		StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
+		iterated = new boolean[PARALLELISM];
+
+		env = constructIterativeJob(env);
 
 		env.execute();
 
@@ -97,4 +104,22 @@ public class IterateTest {
 
 	}
 
+	@Test
+	public void testWithCheckPointing() throws Exception {
+		StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
+
+		env = constructIterativeJob(env);
+
+		env.enableCheckpointing();
+		try {
+			env.execute();
+
+			// this statement should never be reached
+			fail();
+		} catch (UnsupportedOperationException e) {
+			// expected behaviour
+		}
+
+	}
+
 }