You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/10/01 11:54:15 UTC

[4/9] flink git commit: [FLINK-2783] Remove "degreeOfParallelism" API calls

[FLINK-2783] Remove "degreeOfParallelism" API calls

This closes #1200


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f2ea4e4d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f2ea4e4d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f2ea4e4d

Branch: refs/heads/master
Commit: f2ea4e4d67968359f7d0594cadac93582b397755
Parents: 82d6236
Author: zentol <s....@web.de>
Authored: Wed Sep 30 11:40:41 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Oct 1 11:03:54 2015 +0200

----------------------------------------------------------------------
 .../flink/api/common/ExecutionConfig.java       | 35 --------------------
 .../flink/api/common/operators/Operator.java    | 25 --------------
 .../flink/api/java/CollectionEnvironment.java   |  9 -----
 .../flink/api/java/ExecutionEnvironment.java    | 35 --------------------
 .../api/java/operators/CoGroupRawOperator.java  |  2 +-
 .../src/main/java/YarnJob.java                  |  2 +-
 .../src/main/java/YarnWordCount.java            |  2 +-
 .../flink/api/scala/ExecutionEnvironment.scala  | 20 -----------
 .../api/java/common/PlanBinder.java             |  6 ++--
 .../spargel/java/record/SpargelIteration.java   |  8 -----
 .../environment/StreamExecutionEnvironment.java | 33 ------------------
 .../api/scala/StreamExecutionEnvironment.scala  | 20 -----------
 .../flink/tez/test/TezProgramTestBase.java      |  2 +-
 13 files changed, 7 insertions(+), 192 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f2ea4e4d/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 3233327..28f3b92 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -206,23 +206,6 @@ public class ExecutionConfig implements Serializable {
 	 *
 	 * @return The parallelism used by operations, unless they override that value. This method
 	 *         returns {@code -1}, if the environment's default parallelism should be used.
-	 * @deprecated Please use {@link #getParallelism}
-	 */
-	@Deprecated
-	public int getDegreeOfParallelism() {
-		return getParallelism();
-	}
-
-	/**
-	 * Gets the parallelism with which operation are executed by default. Operations can
-	 * individually override this value to use a specific parallelism.
-	 *
-	 * Other operations may need to run with a different parallelism - for example calling
-	 * a reduce operation over the entire data set will involve an operation that runs
-	 * with a parallelism of one (the final reduce to the single result value).
-	 *
-	 * @return The parallelism used by operations, unless they override that value. This method
-	 *         returns {@code -1}, if the environment's default parallelism should be used.
 	 */
 	public int getParallelism() {
 		return parallelism;
@@ -239,24 +222,6 @@ public class ExecutionConfig implements Serializable {
 	 * from a JAR file, the default parallelism is the one configured for that setup.
 	 *
 	 * @param parallelism The parallelism to use
-	 * @deprecated Please use {@link #setParallelism}
-	 */
-	@Deprecated
-	public ExecutionConfig setDegreeOfParallelism(int parallelism) {
-		return setParallelism(parallelism);
-	}
-
-	/**
-	 * Sets the parallelism for operations executed through this environment.
-	 * Setting a parallelism of x here will cause all operators (such as join, map, reduce) to run with
-	 * x parallel instances.
-	 * <p>
-	 * This method overrides the default parallelism for this environment.
-	 * The local execution environment uses by default a value equal to the number of hardware
-	 * contexts (CPU cores / threads). When executing the program via the command line client
-	 * from a JAR file, the default parallelism is the one configured for that setup.
-	 *
-	 * @param parallelism The parallelism to use
 	 */
 	public ExecutionConfig setParallelism(int parallelism) {
 		if (parallelism < 1 && parallelism != -1) {

http://git-wip-us.apache.org/repos/asf/flink/blob/f2ea4e4d/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java
index 840c253..19294d2 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java
@@ -163,19 +163,6 @@ public abstract class Operator<OUT> implements Visitable<Operator<?>> {
 	 * Gets the parallelism for this contract instance. The parallelism denotes
 	 * how many parallel instances of the user function will be spawned during the execution. If this
 	 * value is <code>-1</code>, then the system will decide the number of parallel instances by itself.
-	 * 
-	 * @return The parallelism.
-	 * @deprecated Please use {@link #getParallelism}
-	 */
-	@Deprecated
-	public int getDegreeOfParallelism() {
-		return getParallelism();
-	}
-
-	/**
-	 * Gets the parallelism for this contract instance. The parallelism denotes
-	 * how many parallel instances of the user function will be spawned during the execution. If this
-	 * value is <code>-1</code>, then the system will decide the number of parallel instances by itself.
 	 *
 	 * @return The parallelism.
 	 */
@@ -187,18 +174,6 @@ public abstract class Operator<OUT> implements Visitable<Operator<?>> {
 	 * Sets the parallelism for this contract instance. The parallelism denotes
 	 * how many parallel instances of the user function will be spawned during the execution. Set this
 	 * value to <code>-1</code> to let the system decide on its own.
-	 * 
-	 * @param parallelism The number of parallel instances to spawn. -1, if unspecified.
-	 * @deprecated Please use {@link #setParallelism}
-	 */
-	@Deprecated
-	public void setDegreeOfParallelism(int parallelism) {
-		setParallelism(parallelism);
-	}
-	/**
-	 * Sets the parallelism for this contract instance. The parallelism denotes
-	 * how many parallel instances of the user function will be spawned during the execution. Set this
-	 * value to <code>-1</code> to let the system decide on its own.
 	 *
 	 * @param parallelism The number of parallel instances to spawn. -1, if unspecified.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/f2ea4e4d/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java
index dbb7cc0..b9e9f81 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java
@@ -34,15 +34,6 @@ public class CollectionEnvironment extends ExecutionEnvironment {
 		return this.lastJobExecutionResult;
 	}
 
-	/**
-	 * @deprecated Please use {@link #getParallelism}
-	 */
-	@Override
-	@Deprecated
-	public int getDegreeOfParallelism() {
-		return getParallelism();
-	}
-
 	@Override
 	public int getParallelism() {
 		return 1; // always serial

http://git-wip-us.apache.org/repos/asf/flink/blob/f2ea4e4d/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index c69294d..a596765 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -152,23 +152,6 @@ public abstract class ExecutionEnvironment {
 	 * parallelism - for example calling
 	 * {@link DataSet#reduce(org.apache.flink.api.common.functions.ReduceFunction)} over the entire
 	 * set will insert eventually an operation that runs non-parallel (parallelism of one).
-	 * 
-	 * @return The parallelism used by operations, unless they override that value. This method
-	 *         returns {@code -1}, if the environments default parallelism should be used.
-	 * @deprecated Please use {@link #getParallelism}
-	 */
-	@Deprecated
-	public int getDegreeOfParallelism() {
-		return getParallelism();
-	}
-
-	/**
-	 * Gets the parallelism with which operation are executed by default. Operations can
-	 * individually override this value to use a specific parallelism via
-	 * {@link Operator#setParallelism(int)}. Other operations may need to run with a different
-	 * parallelism - for example calling
-	 * {@link DataSet#reduce(org.apache.flink.api.common.functions.ReduceFunction)} over the entire
-	 * set will insert eventually an operation that runs non-parallel (parallelism of one).
 	 *
 	 * @return The parallelism used by operations, unless they override that value. This method
 	 *         returns {@code -1}, if the environments default parallelism should be used.
@@ -176,24 +159,6 @@ public abstract class ExecutionEnvironment {
 	public int getParallelism() {
 		return config.getParallelism();
 	}
-	
-	/**
-	 * Sets the parallelism for operations executed through this environment.
-	 * Setting a parallelism of x here will cause all operators (such as join, map, reduce) to run with
-	 * x parallel instances.
-	 * <p>
-	 * This method overrides the default parallelism for this environment.
-	 * The {@link LocalEnvironment} uses by default a value equal to the number of hardware
-	 * contexts (CPU cores / threads). When executing the program via the command line client 
-	 * from a JAR file, the default parallelism is the one configured for that setup.
-	 * 
-	 * @param parallelism The parallelism
-	 * @deprecated Please use {@link #setParallelism}
-	 */
-	@Deprecated
-	public void setDegreeOfParallelism(int parallelism) {
-		setParallelism(parallelism);
-	}
 
 	/**
 	 * Sets the parallelism for operations executed through this environment.

http://git-wip-us.apache.org/repos/asf/flink/blob/f2ea4e4d/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupRawOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupRawOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupRawOperator.java
index 38326bd..30639c3 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupRawOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupRawOperator.java
@@ -102,7 +102,7 @@ public class CoGroupRawOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2,
 			po.setSecondInput(input2);
 
 			// set dop
-			po.setDegreeOfParallelism(this.getParallelism());
+			po.setParallelism(this.getParallelism());
 
 			return po;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f2ea4e4d/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/YarnJob.java
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/YarnJob.java b/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/YarnJob.java
index 51627d5..30885f1 100644
--- a/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/YarnJob.java
+++ b/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/YarnJob.java
@@ -42,7 +42,7 @@ public class YarnJob {
 		
 		// To use Tez YARN execution, use
 		final RemoteTezEnvironment env = RemoteTezEnvironment.create();
-		env.setDegreeOfParallelism(8);
+		env.setParallelism(8);
 
 		/**
 		 * Here, you can start creating your execution plan for Flink.

http://git-wip-us.apache.org/repos/asf/flink/blob/f2ea4e4d/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/YarnWordCount.java
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/YarnWordCount.java b/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/YarnWordCount.java
index e97dc0b..5f9ef74 100644
--- a/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/YarnWordCount.java
+++ b/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/YarnWordCount.java
@@ -51,7 +51,7 @@ public class YarnWordCount {
 		
 		// set up the execution environment
 		final RemoteTezEnvironment env = RemoteTezEnvironment.create();
-		env.setDegreeOfParallelism(parallelism);
+		env.setParallelism(parallelism);
 
 		// get input data
 		DataSet<String> text = env.readTextFile(textPath);

http://git-wip-us.apache.org/repos/asf/flink/blob/f2ea4e4d/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index 85c5410..3427225 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -82,18 +82,6 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
    * Setting a parallelism of x here will cause all operators (such as join, map, reduce) to run
    * with x parallel instances. This value can be overridden by specific operations using
    * [[DataSet.setParallelism]].
-   * @deprecated Please use [[setParallelism]]
-   */
-  @deprecated
-  def setDegreeOfParallelism(parallelism: Int): Unit = {
-    setParallelism(parallelism)
-  }
-
-  /**
-   * Sets the parallelism (parallelism) for operations executed through this environment.
-   * Setting a parallelism of x here will cause all operators (such as join, map, reduce) to run
-   * with x parallel instances. This value can be overridden by specific operations using
-   * [[DataSet.setParallelism]].
    */
   def setParallelism(parallelism: Int): Unit = {
     javaEnv.setParallelism(parallelism)
@@ -102,14 +90,6 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
   /**
    * Returns the default parallelism for this execution environment. Note that this
    * value can be overridden by individual operations using [[DataSet.setParallelism]]
-   * @deprecated Please use [[getParallelism]]
-   */
-  @deprecated
-  def getDegreeOfParallelism = javaEnv.getParallelism
-
-  /**
-   * Returns the default parallelism for this execution environment. Note that this
-   * value can be overridden by individual operations using [[DataSet.setParallelism]]
    */
   def getParallelism = javaEnv.getParallelism
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f2ea4e4d/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java
index 8ca0405..ca252f8 100644
--- a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java
+++ b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java
@@ -84,7 +84,7 @@ public abstract class PlanBinder<INFO extends OperationInfo> {
 			switch (Parameters.valueOf(((String) value.getField(0)).toUpperCase())) {
 				case DOP:
 					Integer dop = (Integer) value.getField(1);
-					env.setDegreeOfParallelism(dop);
+					env.setParallelism(dop);
 					break;
 				case MODE:
 					FLINK_HDFS_PATH = (Boolean) value.getField(1) ? "file:/tmp/flink" : "hdfs:/tmp/flink";
@@ -98,8 +98,8 @@ public abstract class PlanBinder<INFO extends OperationInfo> {
 					break;
 			}
 		}
-		if (env.getDegreeOfParallelism() < 0) {
-			env.setDegreeOfParallelism(1);
+		if (env.getParallelism() < 0) {
+			env.setParallelism(1);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f2ea4e4d/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/SpargelIteration.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/SpargelIteration.java b/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/SpargelIteration.java
index 8f1839d..7c137c9 100644
--- a/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/SpargelIteration.java
+++ b/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/SpargelIteration.java
@@ -128,14 +128,6 @@ public class SpargelIteration {
 		return this.iteration;
 	}
 
-	/**
-	 * @deprecated Please use {@link #setParallelism}
-	 */
-	@Deprecated
-	public void setDegreeOfParallelism(int parallelism) {
-		setParallelism(parallelism);
-	}
-
 	public void setParallelism(int parallelism) {
 		this.iteration.setParallelism(parallelism);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/f2ea4e4d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 5537fd4..598d0df 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -157,39 +157,6 @@ public abstract class StreamExecutionEnvironment {
 	 *
 	 * @param parallelism
 	 * 		The parallelism
-	 * @deprecated Please use {@link #setParallelism}
-	 */
-	@Deprecated
-	public StreamExecutionEnvironment setDegreeOfParallelism(int parallelism) {
-		return setParallelism(parallelism);
-	}
-
-	/**
-	 * Gets the parallelism with which operation are executed by default.
-	 * Operations can individually override this value to use a specific
-	 * parallelism.
-	 *
-	 * @return The parallelism used by operations, unless they override that
-	 * value.
-	 * @deprecated Please use {@link #getParallelism}
-	 */
-	@Deprecated
-	public int getDegreeOfParallelism() {
-		return getParallelism();
-	}
-
-	/**
-	 * Sets the parallelism for operations executed through this environment.
-	 * Setting a parallelism of x here will cause all operators (such as map,
-	 * batchReduce) to run with x parallel instances. This method overrides the
-	 * default parallelism for this environment. The
-	 * {@link LocalStreamEnvironment} uses by default a value equal to the
-	 * number of hardware contexts (CPU cores / threads). When executing the
-	 * program via the command line client from a JAR file, the default degree
-	 * of parallelism is the one configured for that setup.
-	 *
-	 * @param parallelism
-	 * 		The parallelism
 	 */
 	public StreamExecutionEnvironment setParallelism(int parallelism) {
 		if (parallelism < 1) {

http://git-wip-us.apache.org/repos/asf/flink/blob/f2ea4e4d/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index e538435..2474d8c 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -51,26 +51,6 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    * Setting a parallelism of x here will cause all operators (such as join, map, reduce) to run
    * with x parallel instances. This value can be overridden by specific operations using
    * [[DataStream#setParallelism(int)]].
-   * @deprecated Please use [[setParallelism]]
-   */
-  @deprecated
-  def setDegreeOfParallelism(degreeOfParallelism: Int): Unit = {
-    javaEnv.setParallelism(degreeOfParallelism)
-  }
-
-  /**
-   * Returns the default parallelism for this execution environment. Note that this
-   * value can be overridden by individual operations using [[DataStream#setParallelism(int)]]
-   * @deprecated Please use [[getParallelism]]
-   */
-  @deprecated
-  def getDegreeOfParallelism = javaEnv.getParallelism
-
-  /**
-   * Sets the parallelism for operations executed through this environment.
-   * Setting a parallelism of x here will cause all operators (such as join, map, reduce) to run
-   * with x parallel instances. This value can be overridden by specific operations using
-   * [[DataStream#setParallelism(int)]].
    */
   def setParallelism(parallelism: Int): Unit = {
     javaEnv.setParallelism(parallelism)

http://git-wip-us.apache.org/repos/asf/flink/blob/f2ea4e4d/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/TezProgramTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/TezProgramTestBase.java b/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/TezProgramTestBase.java
index e3c6f1b..6186a47 100644
--- a/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/TezProgramTestBase.java
+++ b/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/TezProgramTestBase.java
@@ -45,7 +45,7 @@ public abstract class TezProgramTestBase extends AbstractTestBase {
     }
 
 
-    public void setDegreeOfParallelism(int degreeOfParallelism) {
+    public void setParallelism(int degreeOfParallelism) {
         this.degreeOfParallelism = degreeOfParallelism;
     }