You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2015/11/03 16:33:39 UTC
[2/3] flink git commit: [FLINK-2930] Respect ExecutionConfig
execution retry delay
[FLINK-2930] Respect ExecutionConfig execution retry delay
- fix hard-coded defaults
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/630aae6a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/630aae6a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/630aae6a
Branch: refs/heads/release-0.10
Commit: 630aae6a26ee8c8bfb96b9b8baab5a148b671fe3
Parents: bd81abb
Author: Ufuk Celebi <uc...@apache.org>
Authored: Tue Oct 27 18:15:15 2015 +0100
Committer: Maximilian Michels <mx...@apache.org>
Committed: Tue Nov 3 16:33:17 2015 +0100
----------------------------------------------------------------------
.../flink/api/common/ExecutionConfig.java | 2 +-
.../flink/api/java/ExecutionEnvironment.java | 25 --------------------
.../flink/runtime/jobmanager/JobManager.scala | 10 +++++++-
.../flink/api/scala/ExecutionEnvironment.scala | 16 -------------
.../environment/StreamExecutionEnvironment.java | 24 -------------------
.../api/graph/StreamingJobGraphGenerator.java | 14 +++--------
.../api/scala/StreamExecutionEnvironment.scala | 17 -------------
7 files changed, 13 insertions(+), 95 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/630aae6a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index b9ebaf7..b620796 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -248,7 +248,7 @@ public class ExecutionConfig implements Serializable {
}
/**
- * @return The delay between retires.
+ * Returns the delay between execution retries.
*/
public long getExecutionRetryDelay() {
return executionRetryDelay;
http://git-wip-us.apache.org/repos/asf/flink/blob/630aae6a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 283d6d4..5405d4e 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -199,31 +199,6 @@ public abstract class ExecutionEnvironment {
}
/**
- * Sets the delay that failed tasks are re-executed in milliseconds. A value of
- * zero effectively disables fault tolerance. A value of {@code -1}
- * indicates that the system default value (as defined in the configuration)
- * should be used.
- *
- * @param executionRetryDelay
- * The delay of time the system will wait to re-execute failed
- * tasks.
- */
- public void setExecutionRetryDelay(long executionRetryDelay) {
- config.setExecutionRetryDelay(executionRetryDelay);
- }
-
- /**
- * Gets the delay time in milliseconds the system will wait to re-execute failed tasks.
- * A value of {@code -1} indicates that the system default value (as defined
- * in the configuration) should be used.
- *
- * @return The delay time the system will wait to re-execute failed tasks.
- */
- public long getExecutionRetryDelay() {
- return config.getExecutionRetryDelay();
- }
-
- /**
* Returns the {@link org.apache.flink.api.common.JobExecutionResult} of the last executed job.
*
* @return The execution result from the latest job execution.
http://git-wip-us.apache.org/repos/asf/flink/blob/630aae6a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index d9b69ad..567e67b 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -818,8 +818,16 @@ class JobManager(
} else {
defaultExecutionRetries
}
+
+ val executionRetryDelay = if (jobGraph.getExecutionRetryDelay() >= 0) {
+ jobGraph.getExecutionRetryDelay()
+ }
+ else {
+ delayBetweenRetries
+ }
+
executionGraph.setNumberOfRetriesLeft(jobNumberRetries)
- executionGraph.setDelayBeforeRetrying(delayBetweenRetries)
+ executionGraph.setDelayBeforeRetrying(executionRetryDelay)
executionGraph.setScheduleMode(jobGraph.getScheduleMode())
executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling())
http://git-wip-us.apache.org/repos/asf/flink/blob/630aae6a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index e27d55a..9238f5c 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -110,22 +110,6 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
def getNumberOfExecutionRetries = javaEnv.getNumberOfExecutionRetries
/**
- * Sets the delay that failed tasks are re-executed. A value of
- * zero effectively disables fault tolerance. A value of "-1"
- * indicates that the system default value (as defined in the configuration)
- * should be used.
- */
- def setExecutionRetryDelay(executionRetryDelay: Long): Unit = {
- javaEnv.setExecutionRetryDelay(executionRetryDelay)
- }
-
- /**
- * Gets the delay time in milliseconds the system will wait to re-execute failed tasks.
- * A value of "-1" indicates that the system default value (as defined
- * in the configuration) should be used.
- */
- def getExecutionRetryDelay = javaEnv.getExecutionRetryDelay
- /**
* Gets the UUID by which this environment is identified. The UUID sets the execution context
* in the cluster or local environment.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/630aae6a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 210447d..3c961f0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -426,30 +426,6 @@ public abstract class StreamExecutionEnvironment {
}
/**
- * Sets the delay that failed tasks are re-executed. A value of
- * zero effectively disables fault tolerance. A value of {@code -1}
- * indicates that the system default value (as defined in the configuration)
- * should be used.
- *
- * @param executionRetryDelay
- * The delay of time the system will wait to re-execute failed
- * tasks.
- */
- public void setExecutionRetryDelay(long executionRetryDelay) {
- config.setExecutionRetryDelay(executionRetryDelay);
- }
-
- /**
- * Gets the delay time in milliseconds the system will wait to re-execute failed tasks.
- * A value of {@code -1} indicates that the system default value (as defined
- * in the configuration) should be used.
- *
- * @return The delay time the system will wait to re-execute failed tasks.
- */
- public long getExecutionRetryDelay() {
- return config.getExecutionRetryDelay();
- }
- /**
* Sets the default parallelism that will be used for the local execution
* environment created by {@link #createLocalEnvironment()}.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/630aae6a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 45cfff1..6881d3c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -99,7 +99,7 @@ public class StreamingJobGraphGenerator {
configureCheckpointing();
configureExecutionRetries();
-
+
configureExecutionRetryDelay();
try {
@@ -416,10 +416,6 @@ public class StreamingJobGraphGenerator {
if(executionRetries == -1) {
streamGraph.getExecutionConfig().setNumberOfExecutionRetries(Integer.MAX_VALUE);
}
- long executionRetryDelay = streamGraph.getExecutionConfig().getExecutionRetryDelay();
- if(executionRetryDelay == -1) {
- streamGraph.getExecutionConfig().setExecutionRetryDelay(100 * 1000);
- }
}
}
@@ -432,13 +428,9 @@ public class StreamingJobGraphGenerator {
jobGraph.setNumberOfExecutionRetries(0);
}
}
-
+
private void configureExecutionRetryDelay() {
long executionRetryDelay = streamGraph.getExecutionConfig().getExecutionRetryDelay();
- if (executionRetryDelay != -1) {
- jobGraph.setExecutionRetryDelay(executionRetryDelay);
- } else {
- jobGraph.setExecutionRetryDelay(100 * 1000);
- }
+ jobGraph.setExecutionRetryDelay(executionRetryDelay);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/630aae6a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index e953696..f632240 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -233,23 +233,6 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
*/
def getNumberOfExecutionRetries = javaEnv.getNumberOfExecutionRetries
- /**
- * Sets the delay that failed tasks are re-executed. A value of
- * zero effectively disables fault tolerance. A value of "-1"
- * indicates that the system default value (as defined in the configuration)
- * should be used.
- */
- def setExecutionRetryDelay(executionRetryDelay: Long): Unit = {
- javaEnv.setExecutionRetryDelay(executionRetryDelay)
- }
-
- /**
- * Gets the delay time in milliseconds the system will wait to re-execute failed tasks.
- * A value of "-1" indicates that the system default value (as defined
- * in the configuration) should be used.
- */
- def getExecutionRetryDelay = javaEnv.getExecutionRetryDelay
-
// --------------------------------------------------------------------------------------------
// Registry for types and serializers
// --------------------------------------------------------------------------------------------