You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/03/20 14:56:47 UTC

[2/3] flink git commit: [streaming] [FLINK-1740] Pass config param for numberOfExecutionRetries to the JobGraph for streaming jobs

[streaming] [FLINK-1740] Pass config param for numberOfExecutionRetries to the JobGraph for streaming jobs

Closes #501


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a6da9f26
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a6da9f26
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a6da9f26

Branch: refs/heads/master
Commit: a6da9f26d7128e3db697b183a53ade26afb40801
Parents: b6f183a
Author: Paris Carbone <se...@gmail.com>
Authored: Wed Mar 18 19:54:36 2015 +0100
Committer: Gyula Fora <gy...@apache.org>
Committed: Fri Mar 20 13:42:35 2015 +0100

----------------------------------------------------------------------
 .../java/org/apache/flink/streaming/api/StreamGraph.java |  4 ++++
 .../flink/streaming/api/StreamingJobGraphGenerator.java  | 11 +++++++++--
 2 files changed, 13 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a6da9f26/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
index 948ea5e..71706bc 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
@@ -555,6 +555,10 @@ public class StreamGraph extends StreamingPlan {
 		return operatorNames.get(vertexID);
 	}
 
+	public ExecutionConfig getExecutionConfig() {
+		return executionConfig;
+	}
+
 	public void setMonitoringEnabled(boolean monitoringEnabled) {
 		this.monitoringEnabled = monitoringEnabled;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6da9f26/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
index 0146448..544ccc6 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
@@ -80,8 +80,15 @@ public class StreamingJobGraphGenerator {
 		jobGraph.setJobType(JobGraph.JobType.STREAMING);
 		jobGraph.setMonitoringEnabled(streamGraph.isMonitoringEnabled());
 		jobGraph.setMonitorInterval(streamGraph.getMonitoringInterval());
-		if (jobGraph.isMonitoringEnabled()) {
-			jobGraph.setNumberOfExecutionRetries(Integer.MAX_VALUE);
+
+		if(jobGraph.isMonitoringEnabled()) {
+			int executionRetries = streamGraph.getExecutionConfig().getNumberOfExecutionRetries();
+			if(executionRetries != -1) {
+				jobGraph.setNumberOfExecutionRetries(executionRetries);
+			}
+			else {
+				jobGraph.setNumberOfExecutionRetries(Integer.MAX_VALUE);	
+			}
 		}
 		init();