You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/10/01 11:54:15 UTC
[4/9] flink git commit: [FLINK-2783] Remove "degreeOfParallelism" API
calls
[FLINK-2783] Remove "degreeOfParallelism" API calls
This closes #1200
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f2ea4e4d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f2ea4e4d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f2ea4e4d
Branch: refs/heads/master
Commit: f2ea4e4d67968359f7d0594cadac93582b397755
Parents: 82d6236
Author: zentol <s....@web.de>
Authored: Wed Sep 30 11:40:41 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Oct 1 11:03:54 2015 +0200
----------------------------------------------------------------------
.../flink/api/common/ExecutionConfig.java | 35 --------------------
.../flink/api/common/operators/Operator.java | 25 --------------
.../flink/api/java/CollectionEnvironment.java | 9 -----
.../flink/api/java/ExecutionEnvironment.java | 35 --------------------
.../api/java/operators/CoGroupRawOperator.java | 2 +-
.../src/main/java/YarnJob.java | 2 +-
.../src/main/java/YarnWordCount.java | 2 +-
.../flink/api/scala/ExecutionEnvironment.scala | 20 -----------
.../api/java/common/PlanBinder.java | 6 ++--
.../spargel/java/record/SpargelIteration.java | 8 -----
.../environment/StreamExecutionEnvironment.java | 33 ------------------
.../api/scala/StreamExecutionEnvironment.scala | 20 -----------
.../flink/tez/test/TezProgramTestBase.java | 2 +-
13 files changed, 7 insertions(+), 192 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f2ea4e4d/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 3233327..28f3b92 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -206,23 +206,6 @@ public class ExecutionConfig implements Serializable {
*
* @return The parallelism used by operations, unless they override that value. This method
* returns {@code -1}, if the environment's default parallelism should be used.
- * @deprecated Please use {@link #getParallelism}
- */
- @Deprecated
- public int getDegreeOfParallelism() {
- return getParallelism();
- }
-
- /**
- * Gets the parallelism with which operation are executed by default. Operations can
- * individually override this value to use a specific parallelism.
- *
- * Other operations may need to run with a different parallelism - for example calling
- * a reduce operation over the entire data set will involve an operation that runs
- * with a parallelism of one (the final reduce to the single result value).
- *
- * @return The parallelism used by operations, unless they override that value. This method
- * returns {@code -1}, if the environment's default parallelism should be used.
*/
public int getParallelism() {
return parallelism;
@@ -239,24 +222,6 @@ public class ExecutionConfig implements Serializable {
* from a JAR file, the default parallelism is the one configured for that setup.
*
* @param parallelism The parallelism to use
- * @deprecated Please use {@link #setParallelism}
- */
- @Deprecated
- public ExecutionConfig setDegreeOfParallelism(int parallelism) {
- return setParallelism(parallelism);
- }
-
- /**
- * Sets the parallelism for operations executed through this environment.
- * Setting a parallelism of x here will cause all operators (such as join, map, reduce) to run with
- * x parallel instances.
- * <p>
- * This method overrides the default parallelism for this environment.
- * The local execution environment uses by default a value equal to the number of hardware
- * contexts (CPU cores / threads). When executing the program via the command line client
- * from a JAR file, the default parallelism is the one configured for that setup.
- *
- * @param parallelism The parallelism to use
*/
public ExecutionConfig setParallelism(int parallelism) {
if (parallelism < 1 && parallelism != -1) {
http://git-wip-us.apache.org/repos/asf/flink/blob/f2ea4e4d/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java
index 840c253..19294d2 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java
@@ -163,19 +163,6 @@ public abstract class Operator<OUT> implements Visitable<Operator<?>> {
* Gets the parallelism for this contract instance. The parallelism denotes
* how many parallel instances of the user function will be spawned during the execution. If this
* value is <code>-1</code>, then the system will decide the number of parallel instances by itself.
- *
- * @return The parallelism.
- * @deprecated Please use {@link #getParallelism}
- */
- @Deprecated
- public int getDegreeOfParallelism() {
- return getParallelism();
- }
-
- /**
- * Gets the parallelism for this contract instance. The parallelism denotes
- * how many parallel instances of the user function will be spawned during the execution. If this
- * value is <code>-1</code>, then the system will decide the number of parallel instances by itself.
*
* @return The parallelism.
*/
@@ -187,18 +174,6 @@ public abstract class Operator<OUT> implements Visitable<Operator<?>> {
* Sets the parallelism for this contract instance. The parallelism denotes
* how many parallel instances of the user function will be spawned during the execution. Set this
* value to <code>-1</code> to let the system decide on its own.
- *
- * @param parallelism The number of parallel instances to spawn. -1, if unspecified.
- * @deprecated Please use {@link #setParallelism}
- */
- @Deprecated
- public void setDegreeOfParallelism(int parallelism) {
- setParallelism(parallelism);
- }
- /**
- * Sets the parallelism for this contract instance. The parallelism denotes
- * how many parallel instances of the user function will be spawned during the execution. Set this
- * value to <code>-1</code> to let the system decide on its own.
*
* @param parallelism The number of parallel instances to spawn. -1, if unspecified.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/f2ea4e4d/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java
index dbb7cc0..b9e9f81 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java
@@ -34,15 +34,6 @@ public class CollectionEnvironment extends ExecutionEnvironment {
return this.lastJobExecutionResult;
}
- /**
- * @deprecated Please use {@link #getParallelism}
- */
- @Override
- @Deprecated
- public int getDegreeOfParallelism() {
- return getParallelism();
- }
-
@Override
public int getParallelism() {
return 1; // always serial
http://git-wip-us.apache.org/repos/asf/flink/blob/f2ea4e4d/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index c69294d..a596765 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -152,23 +152,6 @@ public abstract class ExecutionEnvironment {
* parallelism - for example calling
* {@link DataSet#reduce(org.apache.flink.api.common.functions.ReduceFunction)} over the entire
* set will insert eventually an operation that runs non-parallel (parallelism of one).
- *
- * @return The parallelism used by operations, unless they override that value. This method
- * returns {@code -1}, if the environments default parallelism should be used.
- * @deprecated Please use {@link #getParallelism}
- */
- @Deprecated
- public int getDegreeOfParallelism() {
- return getParallelism();
- }
-
- /**
- * Gets the parallelism with which operation are executed by default. Operations can
- * individually override this value to use a specific parallelism via
- * {@link Operator#setParallelism(int)}. Other operations may need to run with a different
- * parallelism - for example calling
- * {@link DataSet#reduce(org.apache.flink.api.common.functions.ReduceFunction)} over the entire
- * set will insert eventually an operation that runs non-parallel (parallelism of one).
*
* @return The parallelism used by operations, unless they override that value. This method
* returns {@code -1}, if the environments default parallelism should be used.
@@ -176,24 +159,6 @@ public abstract class ExecutionEnvironment {
public int getParallelism() {
return config.getParallelism();
}
-
- /**
- * Sets the parallelism for operations executed through this environment.
- * Setting a parallelism of x here will cause all operators (such as join, map, reduce) to run with
- * x parallel instances.
- * <p>
- * This method overrides the default parallelism for this environment.
- * The {@link LocalEnvironment} uses by default a value equal to the number of hardware
- * contexts (CPU cores / threads). When executing the program via the command line client
- * from a JAR file, the default parallelism is the one configured for that setup.
- *
- * @param parallelism The parallelism
- * @deprecated Please use {@link #setParallelism}
- */
- @Deprecated
- public void setDegreeOfParallelism(int parallelism) {
- setParallelism(parallelism);
- }
/**
* Sets the parallelism for operations executed through this environment.
http://git-wip-us.apache.org/repos/asf/flink/blob/f2ea4e4d/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupRawOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupRawOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupRawOperator.java
index 38326bd..30639c3 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupRawOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupRawOperator.java
@@ -102,7 +102,7 @@ public class CoGroupRawOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2,
po.setSecondInput(input2);
// set dop
- po.setDegreeOfParallelism(this.getParallelism());
+ po.setParallelism(this.getParallelism());
return po;
http://git-wip-us.apache.org/repos/asf/flink/blob/f2ea4e4d/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/YarnJob.java
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/YarnJob.java b/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/YarnJob.java
index 51627d5..30885f1 100644
--- a/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/YarnJob.java
+++ b/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/YarnJob.java
@@ -42,7 +42,7 @@ public class YarnJob {
// To use Tez YARN execution, use
final RemoteTezEnvironment env = RemoteTezEnvironment.create();
- env.setDegreeOfParallelism(8);
+ env.setParallelism(8);
/**
* Here, you can start creating your execution plan for Flink.
http://git-wip-us.apache.org/repos/asf/flink/blob/f2ea4e4d/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/YarnWordCount.java
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/YarnWordCount.java b/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/YarnWordCount.java
index e97dc0b..5f9ef74 100644
--- a/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/YarnWordCount.java
+++ b/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/YarnWordCount.java
@@ -51,7 +51,7 @@ public class YarnWordCount {
// set up the execution environment
final RemoteTezEnvironment env = RemoteTezEnvironment.create();
- env.setDegreeOfParallelism(parallelism);
+ env.setParallelism(parallelism);
// get input data
DataSet<String> text = env.readTextFile(textPath);
http://git-wip-us.apache.org/repos/asf/flink/blob/f2ea4e4d/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index 85c5410..3427225 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -82,18 +82,6 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
* Setting a parallelism of x here will cause all operators (such as join, map, reduce) to run
* with x parallel instances. This value can be overridden by specific operations using
* [[DataSet.setParallelism]].
- * @deprecated Please use [[setParallelism]]
- */
- @deprecated
- def setDegreeOfParallelism(parallelism: Int): Unit = {
- setParallelism(parallelism)
- }
-
- /**
- * Sets the parallelism (parallelism) for operations executed through this environment.
- * Setting a parallelism of x here will cause all operators (such as join, map, reduce) to run
- * with x parallel instances. This value can be overridden by specific operations using
- * [[DataSet.setParallelism]].
*/
def setParallelism(parallelism: Int): Unit = {
javaEnv.setParallelism(parallelism)
@@ -102,14 +90,6 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
/**
* Returns the default parallelism for this execution environment. Note that this
* value can be overridden by individual operations using [[DataSet.setParallelism]]
- * @deprecated Please use [[getParallelism]]
- */
- @deprecated
- def getDegreeOfParallelism = javaEnv.getParallelism
-
- /**
- * Returns the default parallelism for this execution environment. Note that this
- * value can be overridden by individual operations using [[DataSet.setParallelism]]
*/
def getParallelism = javaEnv.getParallelism
http://git-wip-us.apache.org/repos/asf/flink/blob/f2ea4e4d/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java
index 8ca0405..ca252f8 100644
--- a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java
+++ b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java
@@ -84,7 +84,7 @@ public abstract class PlanBinder<INFO extends OperationInfo> {
switch (Parameters.valueOf(((String) value.getField(0)).toUpperCase())) {
case DOP:
Integer dop = (Integer) value.getField(1);
- env.setDegreeOfParallelism(dop);
+ env.setParallelism(dop);
break;
case MODE:
FLINK_HDFS_PATH = (Boolean) value.getField(1) ? "file:/tmp/flink" : "hdfs:/tmp/flink";
@@ -98,8 +98,8 @@ public abstract class PlanBinder<INFO extends OperationInfo> {
break;
}
}
- if (env.getDegreeOfParallelism() < 0) {
- env.setDegreeOfParallelism(1);
+ if (env.getParallelism() < 0) {
+ env.setParallelism(1);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f2ea4e4d/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/SpargelIteration.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/SpargelIteration.java b/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/SpargelIteration.java
index 8f1839d..7c137c9 100644
--- a/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/SpargelIteration.java
+++ b/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/SpargelIteration.java
@@ -128,14 +128,6 @@ public class SpargelIteration {
return this.iteration;
}
- /**
- * @deprecated Please use {@link #setParallelism}
- */
- @Deprecated
- public void setDegreeOfParallelism(int parallelism) {
- setParallelism(parallelism);
- }
-
public void setParallelism(int parallelism) {
this.iteration.setParallelism(parallelism);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f2ea4e4d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 5537fd4..598d0df 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -157,39 +157,6 @@ public abstract class StreamExecutionEnvironment {
*
* @param parallelism
* The parallelism
- * @deprecated Please use {@link #setParallelism}
- */
- @Deprecated
- public StreamExecutionEnvironment setDegreeOfParallelism(int parallelism) {
- return setParallelism(parallelism);
- }
-
- /**
- * Gets the parallelism with which operation are executed by default.
- * Operations can individually override this value to use a specific
- * parallelism.
- *
- * @return The parallelism used by operations, unless they override that
- * value.
- * @deprecated Please use {@link #getParallelism}
- */
- @Deprecated
- public int getDegreeOfParallelism() {
- return getParallelism();
- }
-
- /**
- * Sets the parallelism for operations executed through this environment.
- * Setting a parallelism of x here will cause all operators (such as map,
- * batchReduce) to run with x parallel instances. This method overrides the
- * default parallelism for this environment. The
- * {@link LocalStreamEnvironment} uses by default a value equal to the
- * number of hardware contexts (CPU cores / threads). When executing the
- * program via the command line client from a JAR file, the default degree
- * of parallelism is the one configured for that setup.
- *
- * @param parallelism
- * The parallelism
*/
public StreamExecutionEnvironment setParallelism(int parallelism) {
if (parallelism < 1) {
http://git-wip-us.apache.org/repos/asf/flink/blob/f2ea4e4d/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index e538435..2474d8c 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -51,26 +51,6 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
* Setting a parallelism of x here will cause all operators (such as join, map, reduce) to run
* with x parallel instances. This value can be overridden by specific operations using
* [[DataStream#setParallelism(int)]].
- * @deprecated Please use [[setParallelism]]
- */
- @deprecated
- def setDegreeOfParallelism(degreeOfParallelism: Int): Unit = {
- javaEnv.setParallelism(degreeOfParallelism)
- }
-
- /**
- * Returns the default parallelism for this execution environment. Note that this
- * value can be overridden by individual operations using [[DataStream#setParallelism(int)]]
- * @deprecated Please use [[getParallelism]]
- */
- @deprecated
- def getDegreeOfParallelism = javaEnv.getParallelism
-
- /**
- * Sets the parallelism for operations executed through this environment.
- * Setting a parallelism of x here will cause all operators (such as join, map, reduce) to run
- * with x parallel instances. This value can be overridden by specific operations using
- * [[DataStream#setParallelism(int)]].
*/
def setParallelism(parallelism: Int): Unit = {
javaEnv.setParallelism(parallelism)
http://git-wip-us.apache.org/repos/asf/flink/blob/f2ea4e4d/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/TezProgramTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/TezProgramTestBase.java b/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/TezProgramTestBase.java
index e3c6f1b..6186a47 100644
--- a/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/TezProgramTestBase.java
+++ b/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/TezProgramTestBase.java
@@ -45,7 +45,7 @@ public abstract class TezProgramTestBase extends AbstractTestBase {
}
- public void setDegreeOfParallelism(int degreeOfParallelism) {
+ public void setParallelism(int degreeOfParallelism) {
this.degreeOfParallelism = degreeOfParallelism;
}