You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2015/04/07 18:32:46 UTC
[4/8] flink git commit: [FLINK-1837] [streaming] Throw Exception for
checkpointed iterative programs
[FLINK-1837] [streaming] Throw Exception for checkpointed iterative programs
Checkpointing currently does not support this special case
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/52ebb295
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/52ebb295
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/52ebb295
Branch: refs/heads/release-0.9.0-milestone-1
Commit: 52ebb295c6782e5cc9c7747656c278849ec9030a
Parents: 954beca
Author: mbalassi <mb...@apache.org>
Authored: Tue Apr 7 17:04:39 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Tue Apr 7 17:04:39 2015 +0200
----------------------------------------------------------------------
.../apache/flink/streaming/api/StreamGraph.java | 7 ++++
.../apache/flink/streaming/api/IterateTest.java | 37 ++++++++++++++++----
2 files changed, 38 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/52ebb295/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
index 351dec9..aa71804 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
@@ -481,6 +481,11 @@ public class StreamGraph extends StreamingPlan {
*/
public JobGraph getJobGraph(String jobGraphName) {
+ // temporarily forbid checkpointing for iterative jobs
+ if (isIterative() && isCheckpointingEnabled()){
+ throw new UnsupportedOperationException("Checkpointing is currently not supported for iterative jobs!");
+ }
+
this.jobName = jobGraphName;
WindowingOptimizer.optimizeGraph(this);
@@ -558,6 +563,8 @@ public class StreamGraph extends StreamingPlan {
return iterationTimeouts.get(vertexID);
}
+ public boolean isIterative() { return !iterationIds.isEmpty(); }
+
public String getOperatorName(Integer vertexID) {
return operatorNames.get(vertexID);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/52ebb295/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
index a64c4b1..31bd147 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
@@ -29,11 +29,13 @@ import org.junit.Test;
import java.util.Collections;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
public class IterateTest {
private static final long MEMORYSIZE = 32;
private static boolean iterated[];
+ private static int PARALLELISM = 2;
public static final class IterationHead extends RichFlatMapFunction<Boolean,Boolean> {
@@ -73,14 +75,10 @@ public class IterateTest {
}
}
- @Test
- public void test() throws Exception {
- int parallelism = 2;
- StreamExecutionEnvironment env = new TestStreamEnvironment(parallelism, MEMORYSIZE);
- iterated = new boolean[parallelism];
+ public StreamExecutionEnvironment constructIterativeJob(StreamExecutionEnvironment env){
env.setBufferTimeout(10);
- DataStream<Boolean> source = env.fromCollection(Collections.nCopies(parallelism, false));
+ DataStream<Boolean> source = env.fromCollection(Collections.nCopies(PARALLELISM, false));
IterativeDataStream<Boolean> iteration = source.iterate(3000);
@@ -88,6 +86,15 @@ public class IterateTest {
new IterationTail());
iteration.closeWith(increment).addSink(new MySink());
+ return env;
+ }
+
+ @Test
+ public void test() throws Exception {
+ StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
+ iterated = new boolean[PARALLELISM];
+
+ env = constructIterativeJob(env);
env.execute();
@@ -97,4 +104,22 @@ public class IterateTest {
}
+ @Test
+ public void testWithCheckPointing() throws Exception {
+ StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
+
+ env = constructIterativeJob(env);
+
+ env.enableCheckpointing();
+ try {
+ env.execute();
+
+ // this statement should never be reached
+ fail();
+ } catch (UnsupportedOperationException e) {
+ // expected behaviour
+ }
+
+ }
+
}