You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/03/20 14:56:47 UTC
[2/3] flink git commit: [streaming] [FLINK-1740] Pass config param
for numberOfExecutionRetries to the JobGraph for streaming jobs
[streaming] [FLINK-1740] Pass config param for numberOfExecutionRetries to the JobGraph for streaming jobs
Closes #501
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a6da9f26
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a6da9f26
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a6da9f26
Branch: refs/heads/master
Commit: a6da9f26d7128e3db697b183a53ade26afb40801
Parents: b6f183a
Author: Paris Carbone <se...@gmail.com>
Authored: Wed Mar 18 19:54:36 2015 +0100
Committer: Gyula Fora <gy...@apache.org>
Committed: Fri Mar 20 13:42:35 2015 +0100
----------------------------------------------------------------------
.../java/org/apache/flink/streaming/api/StreamGraph.java | 4 ++++
.../flink/streaming/api/StreamingJobGraphGenerator.java | 11 +++++++++--
2 files changed, 13 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a6da9f26/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
index 948ea5e..71706bc 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
@@ -555,6 +555,10 @@ public class StreamGraph extends StreamingPlan {
return operatorNames.get(vertexID);
}
+ public ExecutionConfig getExecutionConfig() {
+ return executionConfig;
+ }
+
public void setMonitoringEnabled(boolean monitoringEnabled) {
this.monitoringEnabled = monitoringEnabled;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a6da9f26/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
index 0146448..544ccc6 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
@@ -80,8 +80,15 @@ public class StreamingJobGraphGenerator {
jobGraph.setJobType(JobGraph.JobType.STREAMING);
jobGraph.setMonitoringEnabled(streamGraph.isMonitoringEnabled());
jobGraph.setMonitorInterval(streamGraph.getMonitoringInterval());
- if (jobGraph.isMonitoringEnabled()) {
- jobGraph.setNumberOfExecutionRetries(Integer.MAX_VALUE);
+
+ if(jobGraph.isMonitoringEnabled()) {
+ int executionRetries = streamGraph.getExecutionConfig().getNumberOfExecutionRetries();
+ if(executionRetries != -1) {
+ jobGraph.setNumberOfExecutionRetries(executionRetries);
+ }
+ else {
+ jobGraph.setNumberOfExecutionRetries(Integer.MAX_VALUE);
+ }
}
init();