You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2015/03/23 09:09:28 UTC

[8/9] flink git commit: [FLINK-1679] deprecate old parallelism config entry

[FLINK-1679] deprecate old parallelism config entry

old config parameter can still be used

OLD
parallelization.degree.default

NEW
parallelism.default


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/013ed82f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/013ed82f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/013ed82f

Branch: refs/heads/master
Commit: 013ed82ff3eccc0946d20a955d98524a7ca0f7e4
Parents: cf84bca
Author: Maximilian Michels <mx...@apache.org>
Authored: Wed Mar 18 10:44:44 2015 +0100
Committer: Maximilian Michels <mx...@apache.org>
Committed: Mon Mar 23 09:03:56 2015 +0100

----------------------------------------------------------------------
 .../org/apache/flink/configuration/ConfigConstants.java     | 9 ++++++++-
 .../src/main/java/org/apache/flink/optimizer/Optimizer.java | 9 +++++++--
 .../streaming/api/environment/StreamContextEnvironment.java | 7 ++++++-
 .../streaming/api/environment/StreamPlanEnvironment.java    | 7 ++++++-
 4 files changed, 27 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/013ed82f/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index b472d8a..09f55fd 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -34,7 +34,14 @@ public final class ConfigConstants {
 	 * The config parameter defining the default parallelism for jobs.
 	 */
 	public static final String DEFAULT_PARALLELISM_KEY = "parallelism.default";
-	
+
+	/**
+	 * The deprecated config parameter defining the default parallelism for jobs.
+	 */
+	@Deprecated
+	public static final String DEFAULT_PARALLELISM_KEY_OLD = "parallelization.degree.default";
+
+
 	/**
 	 * Config parameter for the number of re-tries for failed tasks. Setting this
 	 * value to 0 effectively disables fault tolerance.

http://git-wip-us.apache.org/repos/asf/flink/blob/013ed82f/flink-optimizer/src/main/java/org/apache/flink/optimizer/Optimizer.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/Optimizer.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/Optimizer.java
index 90421b7..c80cfc2 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/Optimizer.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/Optimizer.java
@@ -348,10 +348,15 @@ public class Optimizer {
 		this.costEstimator = estimator;
 
 		// determine the default parallelism
+		// check for old key string first, then for new one
 		this.defaultParallelism = GlobalConfiguration.getInteger(
-				ConfigConstants.DEFAULT_PARALLELISM_KEY,
+				ConfigConstants.DEFAULT_PARALLELISM_KEY_OLD,
 				ConfigConstants.DEFAULT_PARALLELISM);
-		
+		// now check for new one which overwrites old values
+		this.defaultParallelism = GlobalConfiguration.getInteger(
+				ConfigConstants.DEFAULT_PARALLELISM_KEY,
+				this.defaultParallelism);
+
 		if (defaultParallelism < 1) {
 			LOG.warn("Config value " + defaultParallelism + " for option "
 					+ ConfigConstants.DEFAULT_PARALLELISM + " is invalid. Ignoring and using a value of 1.");

http://git-wip-us.apache.org/repos/asf/flink/blob/013ed82f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
index 7ae78f1..f7dd0bf 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
@@ -38,9 +38,14 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {
 		if (parallelism > 0) {
 			setParallelism(parallelism);
 		} else {
+			// first check for old parallelism config key
 			setParallelism(GlobalConfiguration.getInteger(
-					ConfigConstants.DEFAULT_PARALLELISM_KEY,
+					ConfigConstants.DEFAULT_PARALLELISM_KEY_OLD,
 					ConfigConstants.DEFAULT_PARALLELISM));
+			// then for new
+			setParallelism(GlobalConfiguration.getInteger(
+					ConfigConstants.DEFAULT_PARALLELISM_KEY,
+					getParallelism()));
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/013ed82f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
index 2cf5cc2..592fa1a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
@@ -36,9 +36,14 @@ public class StreamPlanEnvironment extends StreamExecutionEnvironment {
 		if (parallelism > 0) {
 			setParallelism(parallelism);
 		} else {
+			// first check for old parallelism config key
 			setParallelism(GlobalConfiguration.getInteger(
-					ConfigConstants.DEFAULT_PARALLELISM_KEY,
+					ConfigConstants.DEFAULT_PARALLELISM_KEY_OLD,
 					ConfigConstants.DEFAULT_PARALLELISM));
+			// then for new
+			setParallelism(GlobalConfiguration.getInteger(
+					ConfigConstants.DEFAULT_PARALLELISM_KEY,
+					getParallelism()));
 		}
 	}