You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2015/03/23 09:09:28 UTC
[8/9] flink git commit: [FLINK-1679] deprecate old parallelism config
entry
[FLINK-1679] deprecate old parallelism config entry
old config parameter can still be used
OLD
parallelization.degree.default
NEW
parallelism.default
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/013ed82f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/013ed82f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/013ed82f
Branch: refs/heads/master
Commit: 013ed82ff3eccc0946d20a955d98524a7ca0f7e4
Parents: cf84bca
Author: Maximilian Michels <mx...@apache.org>
Authored: Wed Mar 18 10:44:44 2015 +0100
Committer: Maximilian Michels <mx...@apache.org>
Committed: Mon Mar 23 09:03:56 2015 +0100
----------------------------------------------------------------------
.../org/apache/flink/configuration/ConfigConstants.java | 9 ++++++++-
.../src/main/java/org/apache/flink/optimizer/Optimizer.java | 9 +++++++--
.../streaming/api/environment/StreamContextEnvironment.java | 7 ++++++-
.../streaming/api/environment/StreamPlanEnvironment.java | 7 ++++++-
4 files changed, 27 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/013ed82f/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index b472d8a..09f55fd 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -34,7 +34,14 @@ public final class ConfigConstants {
* The config parameter defining the default parallelism for jobs.
*/
public static final String DEFAULT_PARALLELISM_KEY = "parallelism.default";
-
+
+ /**
+ * The deprecated config parameter defining the default parallelism for jobs.
+ */
+ @Deprecated
+ public static final String DEFAULT_PARALLELISM_KEY_OLD = "parallelization.degree.default";
+
+
/**
* Config parameter for the number of re-tries for failed tasks. Setting this
* value to 0 effectively disables fault tolerance.
http://git-wip-us.apache.org/repos/asf/flink/blob/013ed82f/flink-optimizer/src/main/java/org/apache/flink/optimizer/Optimizer.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/Optimizer.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/Optimizer.java
index 90421b7..c80cfc2 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/Optimizer.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/Optimizer.java
@@ -348,10 +348,15 @@ public class Optimizer {
this.costEstimator = estimator;
// determine the default parallelism
+ // check for old key string first, then for new one
this.defaultParallelism = GlobalConfiguration.getInteger(
- ConfigConstants.DEFAULT_PARALLELISM_KEY,
+ ConfigConstants.DEFAULT_PARALLELISM_KEY_OLD,
ConfigConstants.DEFAULT_PARALLELISM);
-
+ // now check for new one which overwrites old values
+ this.defaultParallelism = GlobalConfiguration.getInteger(
+ ConfigConstants.DEFAULT_PARALLELISM_KEY,
+ this.defaultParallelism);
+
if (defaultParallelism < 1) {
LOG.warn("Config value " + defaultParallelism + " for option "
+ ConfigConstants.DEFAULT_PARALLELISM + " is invalid. Ignoring and using a value of 1.");
http://git-wip-us.apache.org/repos/asf/flink/blob/013ed82f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
index 7ae78f1..f7dd0bf 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
@@ -38,9 +38,14 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {
if (parallelism > 0) {
setParallelism(parallelism);
} else {
+ // first check for old parallelism config key
setParallelism(GlobalConfiguration.getInteger(
- ConfigConstants.DEFAULT_PARALLELISM_KEY,
+ ConfigConstants.DEFAULT_PARALLELISM_KEY_OLD,
ConfigConstants.DEFAULT_PARALLELISM));
+ // then for new
+ setParallelism(GlobalConfiguration.getInteger(
+ ConfigConstants.DEFAULT_PARALLELISM_KEY,
+ getParallelism()));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/013ed82f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
index 2cf5cc2..592fa1a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
@@ -36,9 +36,14 @@ public class StreamPlanEnvironment extends StreamExecutionEnvironment {
if (parallelism > 0) {
setParallelism(parallelism);
} else {
+ // first check for old parallelism config key
setParallelism(GlobalConfiguration.getInteger(
- ConfigConstants.DEFAULT_PARALLELISM_KEY,
+ ConfigConstants.DEFAULT_PARALLELISM_KEY_OLD,
ConfigConstants.DEFAULT_PARALLELISM));
+ // then for new
+ setParallelism(GlobalConfiguration.getInteger(
+ ConfigConstants.DEFAULT_PARALLELISM_KEY,
+ getParallelism()));
}
}