You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2015/03/23 09:09:24 UTC
[4/9] flink git commit: [FLINK-1679] use a consistent name for
parallelism
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java
index 7f468fa..87d7779 100644
--- a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java
+++ b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java
@@ -178,8 +178,8 @@ public class ScalaAggregateOperator<IN> extends SingleInputOperator<IN, IN, Scal
// set input
po.setInput(input);
- // set dop
- po.setDegreeOfParallelism(this.getParallelism());
+ // set parallelism
+ po.setParallelism(this.getParallelism());
return po;
}
@@ -195,8 +195,8 @@ public class ScalaAggregateOperator<IN> extends SingleInputOperator<IN, IN, Scal
// set input
po.setInput(input);
- // set dop
- po.setDegreeOfParallelism(this.getParallelism());
+ // set parallelism
+ po.setParallelism(this.getParallelism());
SingleInputSemanticProperties props = new SingleInputSemanticProperties();
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
index 00761ec..1291181 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
@@ -72,7 +72,7 @@ import scala.reflect.ClassTag
* are named similarly. All functions are available in package
* `org.apache.flink.api.common.functions`.
*
- * The elements are partitioned depending on the degree of parallelism of the
+ * The elements are partitioned depending on the parallelism of the
* [[ExecutionEnvironment]] or of one specific DataSet.
*
* Most of the operations have an implicit [[TypeInformation]] parameter. This is supplied by
@@ -149,13 +149,13 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
}
/**
- * Sets the degree of parallelism of this operation. This must be greater than 1.
+ * Sets the parallelism of this operation. This must be greater than 1.
*/
- def setParallelism(dop: Int) = {
+ def setParallelism(parallelism: Int) = {
javaSet match {
- case ds: DataSource[_] => ds.setParallelism(dop)
- case op: Operator[_, _] => op.setParallelism(dop)
- case di: DeltaIterationResultSet[_, _] => di.getIterationHead.parallelism(dop)
+ case ds: DataSource[_] => ds.setParallelism(parallelism)
+ case op: Operator[_, _] => op.setParallelism(parallelism)
+ case di: DeltaIterationResultSet[_, _] => di.getIterationHead.parallelism(parallelism)
case _ =>
throw new UnsupportedOperationException("Operator " + javaSet.toString + " cannot have " +
"parallelism.")
@@ -164,7 +164,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
}
/**
- * Returns the degree of parallelism of this operation.
+ * Returns the parallelism of this operation.
*/
def getParallelism: Int = javaSet match {
case ds: DataSource[_] => ds.getParallelism
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index cccea78..4c1e627 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -78,9 +78,9 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
}
/**
- * Sets the degree of parallelism (DOP) for operations executed through this environment.
- * Setting a DOP of x here will cause all operators (such as join, map, reduce) to run with
- * x parallel instances. This value can be overridden by specific operations using
+ * Sets the parallelism (parallelism) for operations executed through this environment.
+ * Setting a parallelism of x here will cause all operators (such as join, map, reduce) to run
+ * with x parallel instances. This value can be overridden by specific operations using
* [[DataSet.setParallelism]].
* @deprecated Please use [[setParallelism]]
*/
@@ -90,8 +90,10 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
}
/**
- * Returns the default degree of parallelism for this execution environment. Note that this
- * value can be overridden by individual operations using [[DataSet.setParallelism]
+ * Sets the parallelism (parallelism) for operations executed through this environment.
+ * Setting a parallelism of x here will cause all operators (such as join, map, reduce) to run
+ * with x parallel instances. This value can be overridden by specific operations using
+ * [[DataSet.setParallelism]].
*/
def setParallelism(parallelism: Int): Unit = {
javaEnv.setParallelism(parallelism)
@@ -432,7 +434,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
* because the framework may move the elements into the cluster if needed.
*
* Note that this operation will result in a non-parallel data source, i.e. a data source with
- * a degree of parallelism of one.
+ * a parallelism of one.
*/
def fromCollection[T: ClassTag : TypeInformation](
data: Seq[T]): DataSet[T] = {
@@ -453,7 +455,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
* framework might move into the cluster if needed.
*
* Note that this operation will result in a non-parallel data source, i.e. a data source with
- * a degree of parallelism of one.
+ * a parallelism of one.
*/
def fromCollection[T: ClassTag : TypeInformation] (
data: Iterator[T]): DataSet[T] = {
@@ -473,7 +475,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
* same type and must be serializable.
*
* * Note that this operation will result in a non-parallel data source, i.e. a data source with
- * a degree of parallelism of one.
+ * a parallelism of one.
*/
def fromElements[T: ClassTag : TypeInformation](data: T*): DataSet[T] = {
Validate.notNull(data, "Data must not be null.")
@@ -610,10 +612,10 @@ object ExecutionEnvironment {
* of parallelism of the local environment is the number of hardware contexts (CPU cores/threads).
*/
def createLocalEnvironment(
- degreeOfParallelism: Int = Runtime.getRuntime.availableProcessors())
+ parallelism: Int = Runtime.getRuntime.availableProcessors())
: ExecutionEnvironment = {
val javaEnv = JavaEnv.createLocalEnvironment()
- javaEnv.setDegreeOfParallelism(degreeOfParallelism)
+ javaEnv.setParallelism(parallelism)
new ExecutionEnvironment(javaEnv)
}
@@ -630,8 +632,8 @@ object ExecutionEnvironment {
/**
* Creates a remote execution environment. The remote environment sends (parts of) the program to
* a cluster for execution. Note that all file paths used in the program must be accessible from
- * the cluster. The execution will use the cluster's default degree of parallelism, unless the
- * parallelism is set explicitly via [[ExecutionEnvironment.setDegreeOfParallelism()]].
+ * the cluster. The execution will use the cluster's default parallelism, unless the
+ * parallelism is set explicitly via [[ExecutionEnvironment.setParallelism()]].
*
* @param host The host name or address of the master (JobManager),
* where the program should be executed.
@@ -649,12 +651,12 @@ object ExecutionEnvironment {
/**
* Creates a remote execution environment. The remote environment sends (parts of) the program
* to a cluster for execution. Note that all file paths used in the program must be accessible
- * from the cluster. The execution will use the specified degree of parallelism.
+ * from the cluster. The execution will use the specified parallelism.
*
* @param host The host name or address of the master (JobManager),
* where the program should be executed.
* @param port The port of the master (JobManager), where the program should be executed.
- * @param degreeOfParallelism The degree of parallelism to use during the execution.
+ * @param parallelism The parallelism to use during the execution.
* @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the
* program uses
* user-defined functions, user-defined input formats, or any libraries,
@@ -664,10 +666,10 @@ object ExecutionEnvironment {
def createRemoteEnvironment(
host: String,
port: Int,
- degreeOfParallelism: Int,
+ parallelism: Int,
jarFiles: String*): ExecutionEnvironment = {
val javaEnv = JavaEnv.createRemoteEnvironment(host, port, jarFiles: _*)
- javaEnv.setDegreeOfParallelism(degreeOfParallelism)
+ javaEnv.setParallelism(parallelism)
new ExecutionEnvironment(javaEnv)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
index 6eaa9ae..c54ee0c 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
@@ -175,17 +175,17 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey> & Se
}
/**
- * Sets the degree of parallelism for the iteration.
+ * Sets the parallelism for the iteration.
*
- * @param parallelism The degree of parallelism.
+ * @param parallelism The parallelism.
*/
public void setParallelism(int parallelism) {
- Validate.isTrue(parallelism > 0 || parallelism == -1, "The degree of parallelism must be positive, or -1 (use default).");
+ Validate.isTrue(parallelism > 0 || parallelism == -1, "The parallelism must be positive, or -1 (use default).");
this.parallelism = parallelism;
}
/**
- * Gets the iteration's degree of parallelism.
+ * Gets the iteration's parallelism.
*
* @return The iterations parallelism, or -1, if not set.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
index 18826b6..6c195cb 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
@@ -75,7 +75,7 @@ public class DegreesWithExceptionITCase {
final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
"localhost", cluster.getJobManagerRPCPort());
- env.setDegreeOfParallelism(PARALLELISM);
+ env.setParallelism(PARALLELISM);
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeInvalidSrcData(env), env);
@@ -100,7 +100,7 @@ public class DegreesWithExceptionITCase {
final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
"localhost", cluster.getJobManagerRPCPort());
- env.setDegreeOfParallelism(PARALLELISM);
+ env.setParallelism(PARALLELISM);
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeInvalidTrgData(env), env);
@@ -125,7 +125,7 @@ public class DegreesWithExceptionITCase {
final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
"localhost", cluster.getJobManagerRPCPort());
- env.setDegreeOfParallelism(PARALLELISM);
+ env.setParallelism(PARALLELISM);
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeInvalidTrgData(env), env);
@@ -150,7 +150,7 @@ public class DegreesWithExceptionITCase {
final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
"localhost", cluster.getJobManagerRPCPort());
- env.setDegreeOfParallelism(PARALLELISM);
+ env.setParallelism(PARALLELISM);
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeInvalidSrcData(env), env);
@@ -175,7 +175,7 @@ public class DegreesWithExceptionITCase {
final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
"localhost", cluster.getJobManagerRPCPort());
- env.setDegreeOfParallelism(PARALLELISM);
+ env.setParallelism(PARALLELISM);
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeInvalidSrcTrgData(env), env);
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java b/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
index b6650d2..bbb7503 100644
--- a/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
+++ b/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
@@ -31,7 +31,7 @@ public class HadoopMapredITCase extends JavaProgramTestBase {
protected void preSubmit() throws Exception {
textPath = createTempFile("text.txt", WordCountData.TEXT);
resultPath = getTempDirPath("result");
- this.setDegreeOfParallelism(4);
+ this.setParallelism(4);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java b/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
index 7eee629..9b4aeea 100644
--- a/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
+++ b/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
@@ -32,7 +32,7 @@ public class HadoopInputOutputITCase extends JavaProgramTestBase {
protected void preSubmit() throws Exception {
textPath = createTempFile("text.txt", WordCountData.TEXT);
resultPath = getTempDirPath("result");
- this.setDegreeOfParallelism(4);
+ this.setParallelism(4);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseITCase.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseITCase.scala
index 28fdfa6..95e2a25 100644
--- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseITCase.scala
+++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseITCase.scala
@@ -33,7 +33,7 @@ class PolynomialBaseITCase extends ShouldMatchers {
def testMapElementToPolynomialVectorSpace (): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- env.setDegreeOfParallelism (2)
+ env.setParallelism (2)
val input = Seq (
LabeledVector (DenseVector (1), 1.0),
@@ -64,7 +64,7 @@ class PolynomialBaseITCase extends ShouldMatchers {
def testMapVectorToPolynomialVectorSpace(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- env.setDegreeOfParallelism(2)
+ env.setParallelism(2)
val input = Seq(
LabeledVector(DenseVector(2, 3), 1.0),
@@ -96,7 +96,7 @@ class PolynomialBaseITCase extends ShouldMatchers {
def testReturnEmptyVectorIfDegreeIsZero(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- env.setDegreeOfParallelism(2)
+ env.setParallelism(2)
val input = Seq(
LabeledVector(DenseVector(2, 3), 1.0),
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITCase.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITCase.scala
index 6e324cb..d783ecb 100644
--- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITCase.scala
+++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITCase.scala
@@ -34,7 +34,7 @@ class ALSITCase extends ShouldMatchers {
val env = ExecutionEnvironment.getExecutionEnvironment
- env.setDegreeOfParallelism(2)
+ env.setParallelism(2)
val als = ALS()
.setIterations(iterations)
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITCase.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITCase.scala
index eb825b9..15292b7 100644
--- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITCase.scala
+++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITCase.scala
@@ -33,7 +33,7 @@ class MultipleLinearRegressionITCase extends ShouldMatchers {
def testEstimationOfLinearFunction(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- env.setDegreeOfParallelism(2)
+ env.setParallelism(2)
val learner = MultipleLinearRegression()
@@ -69,7 +69,7 @@ class MultipleLinearRegressionITCase extends ShouldMatchers {
def testEstimationOfCubicFunction(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- env.setDegreeOfParallelism(2)
+ env.setParallelism(2)
val polynomialBase = PolynomialBase()
val learner = MultipleLinearRegression()
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java b/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
index 4f84467..d054975 100644
--- a/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
+++ b/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
@@ -217,17 +217,17 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey>, Ver
}
/**
- * Sets the degree of parallelism for the iteration.
+ * Sets the parallelism for the iteration.
*
- * @param parallelism The degree of parallelism.
+ * @param parallelism The parallelism.
*/
public void setParallelism(int parallelism) {
- Validate.isTrue(parallelism > 0 || parallelism == -1, "The degree of parallelism must be positive, or -1 (use default).");
+ Validate.isTrue(parallelism > 0 || parallelism == -1, "The parallelism must be positive, or -1 (use default).");
this.parallelism = parallelism;
}
/**
- * Gets the iteration's degree of parallelism.
+ * Gets the iteration's parallelism.
*
* @return The iterations parallelism, or -1, if not set.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java b/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java
index 019345f..38c26f0 100644
--- a/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java
+++ b/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java
@@ -48,7 +48,7 @@ public class SpargelCompilerTest extends CompilerTestBase {
public void testSpargelCompiler() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+ env.setParallelism(DEFAULT_PARALLELISM);
// compose test program
{
DataSet<Long> vertexIds = env.generateSequence(1, 2);
@@ -116,7 +116,7 @@ public class SpargelCompilerTest extends CompilerTestBase {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+ env.setParallelism(DEFAULT_PARALLELISM);
// compose test program
{
DataSet<Long> bcVar = env.fromElements(1L);
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelTranslationTest.java b/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelTranslationTest.java
index b31618c..cd48dcd 100644
--- a/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelTranslationTest.java
+++ b/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelTranslationTest.java
@@ -49,7 +49,7 @@ public class SpargelTranslationTest {
;
final int NUM_ITERATIONS = 13;
- final int ITERATION_DOP = 77;
+ final int ITERATION_parallelism = 77;
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -75,7 +75,7 @@ public class SpargelTranslationTest {
vertexIteration.addBroadcastSetForUpdateFunction(BC_SET_UPDATES_NAME, bcUpdate);
vertexIteration.setName(ITERATION_NAME);
- vertexIteration.setParallelism(ITERATION_DOP);
+ vertexIteration.setParallelism(ITERATION_parallelism);
vertexIteration.registerAggregator(AGGREGATOR_NAME, new LongSumAggregator());
@@ -93,7 +93,7 @@ public class SpargelTranslationTest {
// check the basic iteration properties
assertEquals(NUM_ITERATIONS, resultSet.getMaxIterations());
assertArrayEquals(new int[] {0}, resultSet.getKeyPositions());
- assertEquals(ITERATION_DOP, iteration.getParallelism());
+ assertEquals(ITERATION_parallelism, iteration.getParallelism());
assertEquals(ITERATION_NAME, iteration.getName());
assertEquals(AGGREGATOR_NAME, iteration.getAggregators().getAllRegisteredAggregators().iterator().next().getName());
@@ -129,7 +129,7 @@ public class SpargelTranslationTest {
;
final int NUM_ITERATIONS = 13;
- final int ITERATION_DOP = 77;
+ final int ITERATION_parallelism = 77;
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -154,7 +154,7 @@ public class SpargelTranslationTest {
vertexIteration.addBroadcastSetForUpdateFunction(BC_SET_UPDATES_NAME, bcVar);
vertexIteration.setName(ITERATION_NAME);
- vertexIteration.setParallelism(ITERATION_DOP);
+ vertexIteration.setParallelism(ITERATION_parallelism);
vertexIteration.registerAggregator(AGGREGATOR_NAME, new LongSumAggregator());
@@ -172,7 +172,7 @@ public class SpargelTranslationTest {
// check the basic iteration properties
assertEquals(NUM_ITERATIONS, resultSet.getMaxIterations());
assertArrayEquals(new int[] {0}, resultSet.getKeyPositions());
- assertEquals(ITERATION_DOP, iteration.getParallelism());
+ assertEquals(ITERATION_parallelism, iteration.getParallelism());
assertEquals(ITERATION_NAME, iteration.getName());
assertEquals(AGGREGATOR_NAME, iteration.getAggregators().getAllRegisteredAggregators().iterator().next().getName());
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
index 6a95b0c..1bf2962 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
@@ -34,7 +34,7 @@ public class KafkaConsumerExample {
return;
}
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment().setDegreeOfParallelism(4);
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment().setParallelism(4);
DataStream<String> kafkaStream = env
.addSource(new KafkaSource<String>(host + ":" + port, topic, new JavaDefaultStringSchema()));
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
index e7abf11..b1e0f0b 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
@@ -36,7 +36,7 @@ public class KafkaProducerExample {
return;
}
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment().setDegreeOfParallelism(4);
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment().setParallelism(4);
@SuppressWarnings({ "unused", "serial" })
DataStream<String> stream1 = env.addSource(new SourceFunction<String>() {
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSimpleConsumerExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSimpleConsumerExample.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSimpleConsumerExample.java
index 2fd31ed..d43460b 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSimpleConsumerExample.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSimpleConsumerExample.java
@@ -34,7 +34,7 @@ public class KafkaSimpleConsumerExample {
return;
}
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment().setDegreeOfParallelism(4);
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment().setParallelism(4);
DataStream<String> kafkaStream = env
.addSource(new PersistentKafkaSource<String>(host + ":" + port, topic, new JavaDefaultStringSchema()));
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
index a094b89..95609f9 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
@@ -183,7 +183,7 @@ public class KafkaITCase {
stream.addSink(new KafkaSink<String>(zookeeperConnectionString, TOPIC, new JavaDefaultStringSchema()));
try {
- env.setDegreeOfParallelism(1);
+ env.setParallelism(1);
env.execute();
} catch (JobExecutionException good) {
Throwable t = good.getCause();
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
index a6eade8..a9ae77a 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
@@ -404,7 +404,7 @@ public class ConnectedDataStream<IN1, IN2> {
dataStream1.streamGraph.addCoTask(returnStream.getId(), functionInvokable,
getInputType1(), getInputType2(), outTypeInfo, functionName,
- environment.getDegreeOfParallelism());
+ environment.getParallelism());
dataStream1.connectGraph(dataStream1, returnStream.getId(), 1);
dataStream1.connectGraph(dataStream2, returnStream.getId(), 2);
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 5768101..59ef108 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -103,7 +103,7 @@ public class DataStream<OUT> {
protected final StreamExecutionEnvironment environment;
protected final Integer id;
protected final String type;
- protected int degreeOfParallelism;
+ protected int parallelism;
protected List<String> userDefinedNames;
protected StreamPartitioner<OUT> partitioner;
@SuppressWarnings("rawtypes")
@@ -133,7 +133,7 @@ public class DataStream<OUT> {
this.id = counter;
this.type = operatorType;
this.environment = environment;
- this.degreeOfParallelism = environment.getDegreeOfParallelism();
+ this.parallelism = environment.getParallelism();
this.streamGraph = environment.getStreamGraph();
this.userDefinedNames = new ArrayList<String>();
this.partitioner = new DistributePartitioner<OUT>(true);
@@ -152,7 +152,7 @@ public class DataStream<OUT> {
this.environment = dataStream.environment;
this.id = dataStream.id;
this.type = dataStream.type;
- this.degreeOfParallelism = dataStream.degreeOfParallelism;
+ this.parallelism = dataStream.parallelism;
this.userDefinedNames = new ArrayList<String>(dataStream.userDefinedNames);
this.partitioner = dataStream.partitioner;
this.streamGraph = dataStream.streamGraph;
@@ -177,12 +177,12 @@ public class DataStream<OUT> {
}
/**
- * Gets the degree of parallelism for this operator.
+ * Gets the parallelism for this operator.
*
* @return The parallelism set for this operator.
*/
public int getParallelism() {
- return this.degreeOfParallelism;
+ return this.parallelism;
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
index 0dda976..b8e0a7d 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
@@ -41,11 +41,11 @@ public class DataStreamSource<OUT> extends SingleOutputStreamOperator<OUT, DataS
}
@Override
- public DataStreamSource<OUT> setParallelism(int dop) {
- if (dop > 1 && !isParallel) {
+ public DataStreamSource<OUT> setParallelism(int parallelism) {
+ if (parallelism > 1 && !isParallel) {
throw new IllegalArgumentException("Source: " + this.id + " is not a parallel source");
} else {
- return (DataStreamSource<OUT>) super.setParallelism(dop);
+ return (DataStreamSource<OUT>) super.setParallelism(parallelism);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
index 7832777..2ac60fe 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
@@ -216,7 +216,7 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> {
return out;
} else if (transformation == WindowTransformation.REDUCEWINDOW
&& parallelism != discretizedStream.getExecutionEnvironment()
- .getDegreeOfParallelism()) {
+ .getParallelism()) {
DiscretizedStream<OUT> out = transform(transformation, "Window partitioner", getType(),
new WindowPartitioner<OUT>(parallelism)).setParallelism(parallelism);
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index 16284d4..8ffe1fc 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -62,20 +62,20 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato
}
/**
- * Sets the degree of parallelism for this operator. The degree must be 1 or
+ * Sets the parallelism for this operator. The degree must be 1 or
* more.
*
- * @param dop
- * The degree of parallelism for this operator.
- * @return The operator with set degree of parallelism.
+ * @param parallelism
+ * The parallelism for this operator.
+ * @return The operator with set parallelism.
*/
- public SingleOutputStreamOperator<OUT, O> setParallelism(int dop) {
- if (dop < 1) {
+ public SingleOutputStreamOperator<OUT, O> setParallelism(int parallelism) {
+ if (parallelism < 1) {
throw new IllegalArgumentException("The parallelism of an operator must be at least 1.");
}
- this.degreeOfParallelism = dop;
+ this.parallelism = parallelism;
- streamGraph.setParallelism(id, degreeOfParallelism);
+ streamGraph.setParallelism(id, parallelism);
return this;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index efbbda9..784d20c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -394,7 +394,7 @@ public class WindowedDataStream<OUT> {
}
/**
- * Returns the degree of parallelism for the stream discretizer. The
+ * Returns the parallelism for the stream discretizer. The
* returned parallelism is either 1 for for non-parallel global policies (or
* when the input stream is non-parallel), environment parallelism for the
* policies that can run in parallel (such as, any ditributed policy, reduce
@@ -408,7 +408,7 @@ public class WindowedDataStream<OUT> {
return isLocal
|| (transformation == WindowTransformation.REDUCEWINDOW && WindowUtils
.isParallelPolicy(getTrigger(), getEviction(), dataStream.getParallelism()))
- || (discretizerKey != null) ? dataStream.environment.getDegreeOfParallelism() : 1;
+ || (discretizerKey != null) ? dataStream.environment.getParallelism() : 1;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
index 4824fca..e1b1453 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
@@ -29,7 +29,7 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment {
*/
@Override
public void execute() throws Exception {
- ClusterUtil.runOnMiniCluster(this.streamGraph.getJobGraph(), getDegreeOfParallelism());
+ ClusterUtil.runOnMiniCluster(this.streamGraph.getJobGraph(), getParallelism());
}
/**
@@ -42,6 +42,6 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment {
@Override
public void execute(String jobName) throws Exception {
ClusterUtil.runOnMiniCluster(this.streamGraph.getJobGraph(jobName),
- getDegreeOfParallelism());
+ getParallelism());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index 2eb05ad..3142bdd 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -120,8 +120,8 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
@Override
public String toString() {
- return "Remote Environment (" + this.host + ":" + this.port + " - DOP = "
- + (getDegreeOfParallelism() == -1 ? "default" : getDegreeOfParallelism()) + ")";
+ return "Remote Environment (" + this.host + ":" + this.port + " - parallelism = "
+ + (getParallelism() == -1 ? "default" : getParallelism()) + ")";
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
index 7d41d2a..7ae78f1 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
@@ -32,15 +32,15 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {
protected List<File> jars;
protected Client client;
- protected StreamContextEnvironment(Client client, List<File> jars, int dop) {
+ protected StreamContextEnvironment(Client client, List<File> jars, int parallelism) {
this.client = client;
this.jars = jars;
- if (dop > 0) {
- setDegreeOfParallelism(dop);
+ if (parallelism > 0) {
+ setParallelism(parallelism);
} else {
- setDegreeOfParallelism(GlobalConfiguration.getInteger(
- ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE_KEY,
- ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE));
+ setParallelism(GlobalConfiguration.getInteger(
+ ConfigConstants.DEFAULT_PARALLELISM_KEY,
+ ConfigConstants.DEFAULT_PARALLELISM));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index dc1826b..9f2ccff 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -64,7 +64,7 @@ import org.apache.flink.streaming.api.invokable.StreamInvokable;
*/
public abstract class StreamExecutionEnvironment {
- private static int defaultLocalDop = Runtime.getRuntime().availableProcessors();
+ private static int defaultLocalParallelism = Runtime.getRuntime().availableProcessors();
private long bufferTimeout = 100;
@@ -93,9 +93,9 @@ public abstract class StreamExecutionEnvironment {
}
/**
- * Gets the degree of parallelism with which operation are executed by
+ * Gets the parallelism with which operation are executed by
* default. Operations can individually override this value to use a
- * specific degree of parallelism.
+ * specific parallelism.
*
* @return The parallelism used by operations, unless they
* override that value.
@@ -252,12 +252,12 @@ public abstract class StreamExecutionEnvironment {
* Sets the default parallelism that will be used for the local execution
* environment created by {@link #createLocalEnvironment()}.
*
- * @param degreeOfParallelism
- * The degree of parallelism to use as the default local
+ * @param parallelism
+ * The parallelism to use as the default local
* parallelism.
*/
- public static void setDefaultLocalParallelism(int degreeOfParallelism) {
- defaultLocalDop = degreeOfParallelism;
+ public static void setDefaultLocalParallelism(int parallelism) {
+ defaultLocalParallelism = parallelism;
}
// --------------------------------------------------------------------------------------------
@@ -617,7 +617,7 @@ public abstract class StreamExecutionEnvironment {
}
boolean isParallel = function instanceof ParallelSourceFunction;
- int dop = isParallel ? getDegreeOfParallelism() : 1;
+ int parallelism = isParallel ? getParallelism() : 1;
ClosureCleaner.clean(function, true);
StreamInvokable<OUT, OUT> sourceInvokable = new SourceInvokable<OUT>(function);
@@ -626,7 +626,7 @@ public abstract class StreamExecutionEnvironment {
outTypeInfo, sourceInvokable, isParallel);
streamGraph.addSourceVertex(returnStream.getId(), sourceInvokable, null, outTypeInfo,
- sourceName, dop);
+ sourceName, parallelism);
return returnStream;
}
@@ -652,7 +652,7 @@ public abstract class StreamExecutionEnvironment {
if (env instanceof ContextEnvironment) {
ContextEnvironment ctx = (ContextEnvironment) env;
currentEnvironment = createContextEnvironment(ctx.getClient(), ctx.getJars(),
- ctx.getDegreeOfParallelism());
+ ctx.getParallelism());
} else if (env instanceof OptimizerPlanEnvironment | env instanceof PreviewPlanEnvironment) {
currentEnvironment = new StreamPlanEnvironment(env);
} else {
@@ -662,38 +662,38 @@ public abstract class StreamExecutionEnvironment {
}
private static StreamExecutionEnvironment createContextEnvironment(Client client,
- List<File> jars, int dop) {
- return new StreamContextEnvironment(client, jars, dop);
+ List<File> jars, int parallelism) {
+ return new StreamContextEnvironment(client, jars, parallelism);
}
/**
* Creates a {@link LocalStreamEnvironment}. The local execution environment
* will run the program in a multi-threaded fashion in the same JVM as the
- * environment was created in. The default degree of parallelism of the
+ * environment was created in. The default parallelism of the
* local environment is the number of hardware contexts (CPU cores /
* threads), unless it was specified differently by
- * {@link #setDegreeOfParallelism(int)}.
+ * {@link #setParallelism(int)}.
*
* @return A local execution environment.
*/
public static LocalStreamEnvironment createLocalEnvironment() {
- return createLocalEnvironment(defaultLocalDop);
+ return createLocalEnvironment(defaultLocalParallelism);
}
/**
* Creates a {@link LocalStreamEnvironment}. The local execution environment
* will run the program in a multi-threaded fashion in the same JVM as the
- * environment was created in. It will use the degree of parallelism
+ * environment was created in. It will use the parallelism
* specified in the parameter.
*
- * @param degreeOfParallelism
- * The degree of parallelism for the local environment.
- * @return A local execution environment with the specified degree of
+ * @param parallelism
+ * The parallelism for the local environment.
+ * @return A local execution environment with the specified
* parallelism.
*/
- public static LocalStreamEnvironment createLocalEnvironment(int degreeOfParallelism) {
+ public static LocalStreamEnvironment createLocalEnvironment(int parallelism) {
currentEnvironment = new LocalStreamEnvironment();
- currentEnvironment.setDegreeOfParallelism(degreeOfParallelism);
+ currentEnvironment.setParallelism(parallelism);
return (LocalStreamEnvironment) currentEnvironment;
}
@@ -703,7 +703,7 @@ public abstract class StreamExecutionEnvironment {
* (parts of) the program to a cluster for execution. Note that all file
* paths used in the program must be accessible from the cluster. The
* execution will use no parallelism, unless the parallelism is set
- * explicitly via {@link #setDegreeOfParallelism}.
+ * explicitly via {@link #setParallelism}.
*
* @param host
* The host name or address of the master (JobManager), where the
@@ -728,7 +728,7 @@ public abstract class StreamExecutionEnvironment {
* Creates a {@link RemoteStreamEnvironment}. The remote environment sends
* (parts of) the program to a cluster for execution. Note that all file
* paths used in the program must be accessible from the cluster. The
- * execution will use the specified degree of parallelism.
+ * execution will use the specified parallelism.
*
* @param host
* The host name or address of the master (JobManager), where the
@@ -736,8 +736,8 @@ public abstract class StreamExecutionEnvironment {
* @param port
* The port of the master (JobManager), where the program should
* be executed.
- * @param degreeOfParallelism
- * The degree of parallelism to use during the execution.
+ * @param parallelism
+ * The parallelism to use during the execution.
* @param jarFiles
* The JAR files with code that needs to be shipped to the
* cluster. If the program uses user-defined functions,
@@ -746,9 +746,9 @@ public abstract class StreamExecutionEnvironment {
* @return A remote environment that executes the program on a cluster.
*/
public static StreamExecutionEnvironment createRemoteEnvironment(String host, int port,
- int degreeOfParallelism, String... jarFiles) {
+ int parallelism, String... jarFiles) {
currentEnvironment = new RemoteStreamEnvironment(host, port, jarFiles);
- currentEnvironment.setDegreeOfParallelism(degreeOfParallelism);
+ currentEnvironment.setParallelism(parallelism);
return currentEnvironment;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
index 1cff7e7..2cf5cc2 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
@@ -32,13 +32,13 @@ public class StreamPlanEnvironment extends StreamExecutionEnvironment {
super();
this.env = env;
- int dop = env.getDegreeOfParallelism();
- if (dop > 0) {
- setDegreeOfParallelism(dop);
+ int parallelism = env.getParallelism();
+ if (parallelism > 0) {
+ setParallelism(parallelism);
} else {
- setDegreeOfParallelism(GlobalConfiguration.getInteger(
- ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE_KEY,
- ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE));
+ setParallelism(GlobalConfiguration.getInteger(
+ ConfigConstants.DEFAULT_PARALLELISM_KEY,
+ ConfigConstants.DEFAULT_PARALLELISM));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
index 9ff8a7f..947f8ab 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
@@ -68,7 +68,7 @@ public class PrintSinkFunction<IN> extends RichSinkFunction<IN> {
// get the target stream
stream = target == STD_OUT ? System.out : System.err;
- // set the prefix if we have a >1 DOP
+ // set the prefix if we have a >1 parallelism
prefix = (context.getNumberOfParallelSubtasks() > 1) ?
((context.getIndexOfThisSubtask() + 1) + "> ") : null;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
index ec8c226..0a423cc 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
@@ -26,7 +26,7 @@ import org.apache.flink.util.Collector;
* Interface for a stream data source.
*
* <p>Sources implementing this specific interface are executed with
- * degree of parallelism 1. To execute your sources in parallel
+ * parallelism 1. To execute your sources in parallel
* see {@link ParallelSourceFunction}.</p>
*
* @param <OUT> The type of the records produced by this source.
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
index 691b111..d04e7e6 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
@@ -37,12 +37,12 @@ public class ClusterUtil {
*
* @param jobGraph
* jobGraph
- * @param degreeOfParallelism
+ * @param parallelism
* numberOfTaskTrackers
* @param memorySize
* memorySize
*/
- public static void runOnMiniCluster(JobGraph jobGraph, int degreeOfParallelism, long memorySize)
+ public static void runOnMiniCluster(JobGraph jobGraph, int parallelism, long memorySize)
throws Exception {
Configuration configuration = jobGraph.getJobConfiguration();
@@ -50,7 +50,7 @@ public class ClusterUtil {
LocalFlinkMiniCluster exec = null;
configuration.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, memorySize);
- configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, degreeOfParallelism);
+ configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);
if (LOG.isInfoEnabled()) {
LOG.info("Running on mini cluster");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java
index d853846..0446b61 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java
@@ -27,11 +27,11 @@ import org.junit.Test;
public abstract class StreamingProgramTestBase extends AbstractTestBase {
- private static final int DEFAULT_DEGREE_OF_PARALLELISM = 4;
+ private static final int DEFAULT_PARALLELISM = 4;
private JobExecutionResult latestExecutionResult;
- private int degreeOfParallelism = DEFAULT_DEGREE_OF_PARALLELISM;
+ private int parallelism = DEFAULT_PARALLELISM;
public StreamingProgramTestBase() {
@@ -40,16 +40,16 @@ public abstract class StreamingProgramTestBase extends AbstractTestBase {
public StreamingProgramTestBase(Configuration config) {
super(config);
- setTaskManagerNumSlots(degreeOfParallelism);
+ setTaskManagerNumSlots(parallelism);
}
- public void setDegreeOfParallelism(int degreeOfParallelism) {
- this.degreeOfParallelism = degreeOfParallelism;
- setTaskManagerNumSlots(degreeOfParallelism);
+ public void setParallelism(int parallelism) {
+ this.parallelism = parallelism;
+ setTaskManagerNumSlots(parallelism);
}
- public int getDegreeOfParallelism() {
- return degreeOfParallelism;
+ public int getParallelism() {
+ return parallelism;
}
public JobExecutionResult getLatestExecutionResult() {
@@ -86,7 +86,7 @@ public abstract class StreamingProgramTestBase extends AbstractTestBase {
}
// prepare the test environment
- TestStreamEnvironment env = new TestStreamEnvironment(this.executor, this.degreeOfParallelism);
+ TestStreamEnvironment env = new TestStreamEnvironment(this.executor, this.parallelism);
env.setAsContext();
// call the test program
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
index 6e0821d..5e785f9 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
@@ -38,15 +38,15 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment {
private ForkableFlinkMiniCluster executor;
private boolean internalExecutor;
- public TestStreamEnvironment(int degreeOfParallelism, long memorySize){
- setDegreeOfParallelism(degreeOfParallelism);
+ public TestStreamEnvironment(int parallelism, long memorySize){
+ setParallelism(parallelism);
this.memorySize = memorySize;
internalExecutor = true;
}
- public TestStreamEnvironment(ForkableFlinkMiniCluster executor, int dop){
+ public TestStreamEnvironment(ForkableFlinkMiniCluster executor, int parallelism){
this.executor = executor;
- setDefaultLocalParallelism(dop);
+ setDefaultLocalParallelism(parallelism);
}
@Override
@@ -62,7 +62,7 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment {
Configuration configuration = jobGraph.getJobConfiguration();
configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,
- getDegreeOfParallelism());
+ getParallelism());
configuration.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, memorySize);
executor = new ForkableFlinkMiniCluster(configuration);
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 59f86b8..3db6531 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -57,11 +57,11 @@ class DataStream[T](javaStream: JavaStream[T]) {
def getJavaStream: JavaStream[T] = javaStream
/**
- * Sets the degree of parallelism of this operation. This must be greater than 1.
+ * Sets the parallelism of this operation. This must be greater than 1.
*/
- def setParallelism(dop: Int): DataStream[T] = {
+ def setParallelism(parallelism: Int): DataStream[T] = {
javaStream match {
- case ds: SingleOutputStreamOperator[_, _] => ds.setParallelism(dop)
+ case ds: SingleOutputStreamOperator[_, _] => ds.setParallelism(parallelism)
case _ =>
throw new UnsupportedOperationException("Operator " + javaStream.toString + " cannot " +
"have " +
@@ -71,7 +71,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
}
/**
- * Returns the degree of parallelism of this operation.
+ * Returns the parallelism of this operation.
*/
def getParallelism: Int = javaStream match {
case op: SingleOutputStreamOperator[_, _] => op.getParallelism
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 598b590..cfa7c18 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -33,9 +33,9 @@ import org.apache.flink.streaming.api.function.source.FileMonitoringFunction.Wat
class StreamExecutionEnvironment(javaEnv: JavaEnv) {
/**
- * Sets the degree of parallelism (DOP) for operations executed through this environment.
- * Setting a DOP of x here will cause all operators (such as join, map, reduce) to run with
- * x parallel instances. This value can be overridden by specific operations using
+ * Sets the parallelism for operations executed through this environment.
+ * Setting a parallelism of x here will cause all operators (such as join, map, reduce) to run
+ * with x parallel instances. This value can be overridden by specific operations using
* [[DataStream.setParallelism]].
* @deprecated Please use [[setParallelism]]
*/
@@ -239,7 +239,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
* same type and must be serializable.
*
* * Note that this operation will result in a non-parallel data source, i.e. a data source with
- * a degree of parallelism of one.
+ * a parallelism of one.
*/
def fromElements[T: ClassTag: TypeInformation](data: T*): DataStream[T] = {
val typeInfo = implicitly[TypeInformation[T]]
@@ -251,7 +251,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
* because the framework may move the elements into the cluster if needed.
*
* Note that this operation will result in a non-parallel data source, i.e. a data source with
- * a degree of parallelism of one.
+ * a parallelism of one.
*/
def fromCollection[T: ClassTag: TypeInformation](
data: Seq[T]): DataStream[T] = {
@@ -352,16 +352,16 @@ object StreamExecutionEnvironment {
* of parallelism of the local environment is the number of hardware contexts (CPU cores/threads).
*/
def createLocalEnvironment(
- degreeOfParallelism: Int = Runtime.getRuntime.availableProcessors()):
+ parallelism: Int = Runtime.getRuntime.availableProcessors()):
StreamExecutionEnvironment = {
- new StreamExecutionEnvironment(JavaEnv.createLocalEnvironment(degreeOfParallelism))
+ new StreamExecutionEnvironment(JavaEnv.createLocalEnvironment(parallelism))
}
/**
* Creates a remote execution environment. The remote environment sends (parts of) the program to
* a cluster for execution. Note that all file paths used in the program must be accessible from
- * the cluster. The execution will use the cluster's default degree of parallelism, unless the
- * parallelism is set explicitly via [[StreamExecutionEnvironment.setDegreeOfParallelism()]].
+ * the cluster. The execution will use the cluster's default parallelism, unless the
+ * parallelism is set explicitly via [[StreamExecutionEnvironment.setParallelism()]].
*
* @param host The host name or address of the master (JobManager),
* where the program should be executed.
@@ -380,12 +380,12 @@ object StreamExecutionEnvironment {
/**
* Creates a remote execution environment. The remote environment sends (parts of) the program
* to a cluster for execution. Note that all file paths used in the program must be accessible
- * from the cluster. The execution will use the specified degree of parallelism.
+ * from the cluster. The execution will use the specified parallelism.
*
* @param host The host name or address of the master (JobManager),
* where the program should be executed.
* @param port The port of the master (JobManager), where the program should be executed.
- * @param degreeOfParallelism The degree of parallelism to use during the execution.
+ * @param parallelism The parallelism to use during the execution.
* @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the
* program uses
* user-defined functions, user-defined input formats, or any libraries,
@@ -395,10 +395,10 @@ object StreamExecutionEnvironment {
def createRemoteEnvironment(
host: String,
port: Int,
- degreeOfParallelism: Int,
+ parallelism: Int,
jarFiles: String*): StreamExecutionEnvironment = {
val javaEnv = JavaEnv.createRemoteEnvironment(host, port, jarFiles: _*)
- javaEnv.setDegreeOfParallelism(degreeOfParallelism)
+ javaEnv.setParallelism(parallelism)
new StreamExecutionEnvironment(javaEnv)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java b/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java
index 9396b66..fd328ae 100644
--- a/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java
+++ b/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java
@@ -41,7 +41,7 @@ import java.io.StringWriter;
/**
* This test should logically be located in the 'flink-runtime' tests. However, this project
- * has already all dependencies required (flink-java-examples). Also, the DOPOneExecEnv is here.
+ * has already all dependencies required (flink-java-examples). Also, the ParallelismOneExecEnv is here.
*/
public class HDFSTest {
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/TachyonFileSystemWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/TachyonFileSystemWrapperTest.java b/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/TachyonFileSystemWrapperTest.java
index e1235b6..3b2fb7f 100644
--- a/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/TachyonFileSystemWrapperTest.java
+++ b/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/TachyonFileSystemWrapperTest.java
@@ -128,7 +128,7 @@ public class TachyonFileSystemWrapperTest {
addHDConfToFlinkConf.setString(ConfigConstants.HDFS_DEFAULT_CONFIG, HADOOP_CONFIG_PATH.toString());
GlobalConfiguration.includeConfiguration(addHDConfToFlinkConf);
- new DopOneTestEnvironment(); // initialize DOP one
+ new DopOneTestEnvironment(); // initialize parallelism one
WordCount.main(new String[]{input, output});
@@ -157,7 +157,7 @@ public class TachyonFileSystemWrapperTest {
@Override
public ExecutionEnvironment createExecutionEnvironment() {
LocalEnvironment le = new LocalEnvironment();
- le.setDegreeOfParallelism(1);
+ le.setParallelism(1);
return le;
}
});
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/CompilerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/CompilerTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/CompilerTestBase.java
index 788327a..435713b 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/CompilerTestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/CompilerTestBase.java
@@ -69,10 +69,10 @@ public abstract class CompilerTestBase {
public void setup() {
this.dataStats = new DataStatistics();
this.withStatsCompiler = new Optimizer(this.dataStats, new DefaultCostEstimator());
- this.withStatsCompiler.setDefaultDegreeOfParallelism(DEFAULT_PARALLELISM);
+ this.withStatsCompiler.setDefaultParallelism(DEFAULT_PARALLELISM);
this.noStatsCompiler = new Optimizer(null, new DefaultCostEstimator());
- this.noStatsCompiler.setDefaultDegreeOfParallelism(DEFAULT_PARALLELISM);
+ this.noStatsCompiler.setDefaultParallelism(DEFAULT_PARALLELISM);
}
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
index 6e86896..2214000 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
@@ -29,11 +29,11 @@ import org.apache.flink.api.java.tuple.Tuple;
public abstract class JavaProgramTestBase extends AbstractTestBase {
- private static final int DEFAULT_DEGREE_OF_PARALLELISM = 4;
+ private static final int DEFAULT_PARALLELISM = 4;
private JobExecutionResult latestExecutionResult;
- private int degreeOfParallelism = DEFAULT_DEGREE_OF_PARALLELISM;
+ private int parallelism = DEFAULT_PARALLELISM;
/**
* The number of times a test should be repeated.
@@ -42,7 +42,7 @@ public abstract class JavaProgramTestBase extends AbstractTestBase {
* tests repeatedly might help to discover resource leaks, race conditions etc.
*/
private int numberOfTestRepetitions = 1;
-
+
private boolean isCollectionExecution;
public JavaProgramTestBase() {
@@ -51,20 +51,20 @@ public abstract class JavaProgramTestBase extends AbstractTestBase {
public JavaProgramTestBase(Configuration config) {
super(config);
- setTaskManagerNumSlots(degreeOfParallelism);
+ setTaskManagerNumSlots(parallelism);
}
- public void setDegreeOfParallelism(int degreeOfParallelism) {
- this.degreeOfParallelism = degreeOfParallelism;
- setTaskManagerNumSlots(degreeOfParallelism);
+ public void setParallelism(int parallelism) {
+ this.parallelism = parallelism;
+ setTaskManagerNumSlots(parallelism);
}
public void setNumberOfTestRepetitions(int numberOfTestRepetitions) {
this.numberOfTestRepetitions = numberOfTestRepetitions;
}
- public int getDegreeOfParallelism() {
- return isCollectionExecution ? 1 : degreeOfParallelism;
+ public int getParallelism() {
+ return isCollectionExecution ? 1 : parallelism;
}
public JobExecutionResult getLatestExecutionResult() {
@@ -110,7 +110,7 @@ public abstract class JavaProgramTestBase extends AbstractTestBase {
}
// prepare the test environment
- TestEnvironment env = new TestEnvironment(this.executor, this.degreeOfParallelism);
+ TestEnvironment env = new TestEnvironment(this.executor, this.parallelism);
env.getConfig().enableObjectReuse();
env.setAsContext();
@@ -162,7 +162,7 @@ public abstract class JavaProgramTestBase extends AbstractTestBase {
}
// prepare the test environment
- TestEnvironment env = new TestEnvironment(this.executor, this.degreeOfParallelism);
+ TestEnvironment env = new TestEnvironment(this.executor, this.parallelism);
env.getConfig().disableObjectReuse();
env.setAsContext();
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java
index 67a4797..eafe9ad 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java
@@ -35,7 +35,7 @@ import org.junit.Test;
public abstract class RecordAPITestBase extends AbstractTestBase {
- protected static final int DOP = 4;
+ protected static final int parallelism = 4;
protected JobExecutionResult jobExecutionResult;
@@ -48,7 +48,7 @@ public abstract class RecordAPITestBase extends AbstractTestBase {
public RecordAPITestBase(Configuration config) {
super(config);
- setTaskManagerNumSlots(DOP);
+ setTaskManagerNumSlots(parallelism);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
index 44f35e7..02c1434 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
@@ -39,9 +39,9 @@ public class TestEnvironment extends ExecutionEnvironment {
protected JobExecutionResult latestResult;
- public TestEnvironment(ForkableFlinkMiniCluster executor, int degreeOfParallelism) {
+ public TestEnvironment(ForkableFlinkMiniCluster executor, int parallelism) {
this.executor = executor;
- setDegreeOfParallelism(degreeOfParallelism);
+ setParallelism(parallelism);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java
index cd38418..1d7d206 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java
@@ -83,7 +83,7 @@ public class AccumulatorITCase extends JavaProgramTestBase {
Assert.assertEquals(Integer.valueOf(3), (Integer) res.getAccumulatorResult("num-lines"));
- Assert.assertEquals(Double.valueOf(getDegreeOfParallelism()), (Double)res.getAccumulatorResult("open-close-counter"));
+ Assert.assertEquals(Double.valueOf(getParallelism()), (Double)res.getAccumulatorResult("open-close-counter"));
// Test histogram (words per line distribution)
Map<Integer, Integer> dist = Maps.newHashMap();
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastBranchingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastBranchingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastBranchingITCase.java
index daf5181..4868aff 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastBranchingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastBranchingITCase.java
@@ -95,7 +95,7 @@ public class BroadcastBranchingITCase extends RecordAPITestBase {
MapOperator mp2 = MapOperator.builder(Mp2.class).setBroadcastVariable("z", mp1).input(jn2).build();
FileDataSink output = new FileDataSink(new ContractITCaseOutputFormat(), resultPath);
- output.setDegreeOfParallelism(1);
+ output.setParallelism(1);
output.setInput(mp2);
return new Plan(output);
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarInitializationITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarInitializationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarInitializationITCase.java
index 06c7506..41d24b8 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarInitializationITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarInitializationITCase.java
@@ -40,7 +40,7 @@ public class BroadcastVarInitializationITCase extends JavaProgramTestBase {
protected void testProgram() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(4);
+ env.setParallelism(4);
DataSet<Integer> data = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8);
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarsNepheleITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarsNepheleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarsNepheleITCase.java
index e9374b1..34d4133 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarsNepheleITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarsNepheleITCase.java
@@ -67,7 +67,7 @@ public class BroadcastVarsNepheleITCase extends RecordAPITestBase {
private static final int NUM_FEATURES = 3;
- private static final int DOP = 4;
+ private static final int parallelism = 4;
protected String pointsPath;
@@ -76,7 +76,7 @@ public class BroadcastVarsNepheleITCase extends RecordAPITestBase {
protected String resultPath;
public BroadcastVarsNepheleITCase(){
- setTaskManagerNumSlots(DOP);
+ setTaskManagerNumSlots(parallelism);
}
@@ -131,7 +131,7 @@ public class BroadcastVarsNepheleITCase extends RecordAPITestBase {
@Override
protected JobGraph getJobGraph() throws Exception {
- return createJobGraphV1(this.pointsPath, this.modelsPath, this.resultPath, DOP);
+ return createJobGraphV1(this.pointsPath, this.modelsPath, this.resultPath, parallelism);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java
index 8e13408..2406d6e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java
@@ -60,9 +60,9 @@ public class KMeansIterativeNepheleITCase extends RecordAPITestBase {
private static final int MEMORY_PER_CONSUMER = 2;
- private static final int DOP = 4;
+ private static final int parallelism = 4;
- private static final double MEMORY_FRACTION_PER_CONSUMER = (double)MEMORY_PER_CONSUMER/TASK_MANAGER_MEMORY_SIZE*DOP;
+ private static final double MEMORY_FRACTION_PER_CONSUMER = (double)MEMORY_PER_CONSUMER/TASK_MANAGER_MEMORY_SIZE*parallelism;
protected String dataPath;
protected String clusterPath;
@@ -70,7 +70,7 @@ public class KMeansIterativeNepheleITCase extends RecordAPITestBase {
public KMeansIterativeNepheleITCase() {
- setTaskManagerNumSlots(DOP);
+ setTaskManagerNumSlots(parallelism);
}
@Override
@@ -87,7 +87,7 @@ public class KMeansIterativeNepheleITCase extends RecordAPITestBase {
@Override
protected JobGraph getJobGraph() throws Exception {
- return createJobGraph(dataPath, clusterPath, this.resultPath, DOP, 20);
+ return createJobGraph(dataPath, clusterPath, this.resultPath, parallelism, 20);
}
// -------------------------------------------------------------------------------------------------------------
@@ -252,8 +252,8 @@ public class KMeansIterativeNepheleITCase extends RecordAPITestBase {
return tail;
}
- private static AbstractJobVertex createSync(JobGraph jobGraph, int numIterations, int dop) {
- AbstractJobVertex sync = JobGraphUtils.createSync(jobGraph, dop);
+ private static AbstractJobVertex createSync(JobGraph jobGraph, int numIterations, int parallelism) {
+ AbstractJobVertex sync = JobGraphUtils.createSync(jobGraph, parallelism);
TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
syncConfig.setNumberOfIterations(numIterations);
syncConfig.setIterationId(ITERATION_ID);
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/cancelling/MapCancelingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/MapCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/MapCancelingITCase.java
index 9bbd282..36db34d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/MapCancelingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/MapCancelingITCase.java
@@ -33,10 +33,10 @@ import org.apache.flink.util.Collector;
@SuppressWarnings("deprecation")
public class MapCancelingITCase extends CancellingTestBase {
- private static final int DOP = 4;
+ private static final int parallelism = 4;
public MapCancelingITCase() {
- setTaskManagerNumSlots(DOP);
+ setTaskManagerNumSlots(parallelism);
}
// @Test
@@ -51,7 +51,7 @@ public class MapCancelingITCase extends CancellingTestBase {
Plan p = new Plan(sink);
- p.setDefaultParallelism(DOP);
+ p.setDefaultParallelism(parallelism);
runAndCancelJob(p, 5 * 1000, 10 * 1000);
}
@@ -68,7 +68,7 @@ public class MapCancelingITCase extends CancellingTestBase {
Plan p = new Plan(sink);
- p.setDefaultParallelism(DOP);
+ p.setDefaultParallelism(parallelism);
runAndCancelJob(p, 5 * 1000, 10 * 1000);
}
@@ -85,7 +85,7 @@ public class MapCancelingITCase extends CancellingTestBase {
Plan p = new Plan(sink);
- p.setDefaultParallelism(DOP);
+ p.setDefaultParallelism(parallelism);
runAndCancelJob(p, 10 * 1000, 10 * 1000);
}
@@ -102,7 +102,7 @@ public class MapCancelingITCase extends CancellingTestBase {
Plan p = new Plan(sink);
- p.setDefaultParallelism(DOP);
+ p.setDefaultParallelism(parallelism);
runAndCancelJob(p, 10 * 1000, 10 * 1000);
}