You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2015/03/23 09:09:26 UTC
[6/9] flink git commit: [FLINK-1679] use a consistent name for
parallelism
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java
index 47446dd..55cbb0f 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java
@@ -57,13 +57,13 @@ public class FlatMapOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, Fl
FlatMapOperatorBase<IN, OUT, FlatMapFunction<IN, OUT>> po = new FlatMapOperatorBase<IN, OUT, FlatMapFunction<IN, OUT>>(function, new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()), name);
// set input
po.setInput(input);
- // set dop
+ // set parallelism
if(this.getParallelism() > 0) {
- // use specified dop
- po.setDegreeOfParallelism(this.getParallelism());
+ // use specified parallelism
+ po.setParallelism(this.getParallelism());
} else {
- // if no dop has been specified, use dop of input operator to enable chaining
- po.setDegreeOfParallelism(input.getDegreeOfParallelism());
+ // if no parallelism has been specified, use parallelism of input operator to enable chaining
+ po.setParallelism(input.getParallelism());
}
return po;
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
index 617162b..3c1d47c 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
@@ -103,8 +103,8 @@ public class GroupCombineOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU
new GroupCombineOperatorBase<IN, OUT, FlatCombineFunction<IN, OUT>>(function, operatorInfo, new int[0], name);
po.setInput(input);
- // the degree of parallelism for a non grouped reduce can only be 1
- po.setDegreeOfParallelism(1);
+ // the parallelism for a non grouped reduce can only be 1
+ po.setParallelism(1);
return po;
}
@@ -130,13 +130,13 @@ public class GroupCombineOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU
}
po.setGroupOrder(o);
- po.setDegreeOfParallelism(this.getParallelism());
+ po.setParallelism(this.getParallelism());
return po;
} else {
PlanUnwrappingGroupCombineOperator<IN, OUT, ?> po = translateSelectorFunctionReducer(
selectorKeys, function, getInputType(), getResultType(), name, input);
- po.setDegreeOfParallelism(this.getParallelism());
+ po.setParallelism(this.getParallelism());
return po;
}
}
@@ -148,7 +148,7 @@ public class GroupCombineOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU
new GroupCombineOperatorBase<IN, OUT, FlatCombineFunction<IN, OUT>>(function, operatorInfo, logicalKeyPositions, name);
po.setInput(input);
- po.setDegreeOfParallelism(getParallelism());
+ po.setParallelism(getParallelism());
// set group order
if (grouper instanceof SortedGrouping) {
@@ -193,7 +193,7 @@ public class GroupCombineOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU
mapper.setInput(input);
// set the mapper's parallelism to the input parallelism to make sure it is chained
- mapper.setDegreeOfParallelism(input.getDegreeOfParallelism());
+ mapper.setParallelism(input.getParallelism());
return reducer;
}
@@ -220,7 +220,7 @@ public class GroupCombineOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU
mapper.setInput(input);
// set the mapper's parallelism to the input parallelism to make sure it is chained
- mapper.setDegreeOfParallelism(input.getDegreeOfParallelism());
+ mapper.setParallelism(input.getParallelism());
return reducer;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
index c542192..e809623 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
@@ -138,8 +138,8 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
po.setCombinable(combinable);
po.setInput(input);
- // the degree of parallelism for a non grouped reduce can only be 1
- po.setDegreeOfParallelism(1);
+ // the parallelism for a non grouped reduce can only be 1
+ po.setParallelism(1);
return po;
}
@@ -165,14 +165,14 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
}
po.setGroupOrder(o);
- po.setDegreeOfParallelism(this.getParallelism());
+ po.setParallelism(this.getParallelism());
po.setCustomPartitioner(grouper.getCustomPartitioner());
return po;
} else {
PlanUnwrappingReduceGroupOperator<IN, OUT, ?> po = translateSelectorFunctionReducer(
selectorKeys, function, getInputType(), getResultType(), name, input, isCombinable());
- po.setDegreeOfParallelism(this.getParallelism());
+ po.setParallelism(this.getParallelism());
po.setCustomPartitioner(grouper.getCustomPartitioner());
return po;
}
@@ -186,7 +186,7 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
po.setCombinable(combinable);
po.setInput(input);
- po.setDegreeOfParallelism(getParallelism());
+ po.setParallelism(getParallelism());
po.setCustomPartitioner(grouper.getCustomPartitioner());
// set group order
@@ -233,7 +233,7 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
mapper.setInput(input);
// set the mapper's parallelism to the input parallelism to make sure it is chained
- mapper.setDegreeOfParallelism(input.getDegreeOfParallelism());
+ mapper.setParallelism(input.getParallelism());
return reducer;
}
@@ -261,7 +261,7 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
mapper.setInput(input);
// set the mapper's parallelism to the input parallelism to make sure it is chained
- mapper.setDegreeOfParallelism(input.getDegreeOfParallelism());
+ mapper.setParallelism(input.getParallelism());
return reducer;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
index 8b61779..e450ae1 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
@@ -273,8 +273,8 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
translateSelectorFunctionJoin(selectorKeys1, selectorKeys2, function,
getInput1Type(), getInput2Type(), getResultType(), name, input1, input2);
- // set dop
- po.setDegreeOfParallelism(this.getParallelism());
+ // set parallelism
+ po.setParallelism(this.getParallelism());
translated = po;
}
@@ -292,8 +292,8 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
function, getInput1Type(), getInput2Type(), getResultType(), name,
input1, input2);
- // set dop
- po.setDegreeOfParallelism(this.getParallelism());
+ // set parallelism
+ po.setParallelism(this.getParallelism());
translated = po;
}
@@ -311,8 +311,8 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
translateSelectorFunctionJoinLeft(selectorKeys1, logicalKeyPositions2, function,
getInput1Type(), getInput2Type(), getResultType(), name, input1, input2);
- // set dop
- po.setDegreeOfParallelism(this.getParallelism());
+ // set parallelism
+ po.setParallelism(this.getParallelism());
translated = po;
}
@@ -332,8 +332,8 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
// set inputs
po.setFirstInput(input1);
po.setSecondInput(input2);
- // set dop
- po.setDegreeOfParallelism(this.getParallelism());
+ // set parallelism
+ po.setParallelism(this.getParallelism());
translated = po;
}
@@ -375,9 +375,9 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
keyMapper1.setInput(input1);
keyMapper2.setInput(input2);
- // set dop
- keyMapper1.setDegreeOfParallelism(input1.getDegreeOfParallelism());
- keyMapper2.setDegreeOfParallelism(input2.getDegreeOfParallelism());
+ // set parallelism
+ keyMapper1.setParallelism(input1.getParallelism());
+ keyMapper2.setParallelism(input2.getParallelism());
return join;
}
@@ -427,8 +427,8 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
join.setSecondInput(keyMapper2);
keyMapper2.setInput(input2);
- // set dop
- keyMapper2.setDegreeOfParallelism(input2.getDegreeOfParallelism());
+ // set parallelism
+ keyMapper2.setParallelism(input2.getParallelism());
return join;
}
@@ -477,8 +477,8 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
join.setSecondInput(input2);
keyMapper1.setInput(input1);
- // set dop
- keyMapper1.setDegreeOfParallelism(input1.getDegreeOfParallelism());
+ // set parallelism
+ keyMapper1.setParallelism(input1.getParallelism());
return join;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
index 7d2bbaa..2663a2a 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
@@ -61,13 +61,13 @@ public class MapOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, MapOpe
new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()), name);
// set input
po.setInput(input);
- // set dop
+ // set parallelism
if(this.getParallelism() > 0) {
- // use specified dop
- po.setDegreeOfParallelism(this.getParallelism());
+ // use specified parallelism
+ po.setParallelism(this.getParallelism());
} else {
- // if no dop has been specified, use dop of input operator to enable chaining
- po.setDegreeOfParallelism(input.getDegreeOfParallelism());
+ // if no parallelism has been specified, use parallelism of input operator to enable chaining
+ po.setParallelism(input.getParallelism());
}
return po;
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-java/src/main/java/org/apache/flink/api/java/operators/MapPartitionOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapPartitionOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapPartitionOperator.java
index a6c69c1..d8a1abd 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapPartitionOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapPartitionOperator.java
@@ -60,13 +60,13 @@ public class MapPartitionOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU
MapPartitionOperatorBase<IN, OUT, MapPartitionFunction<IN, OUT>> po = new MapPartitionOperatorBase<IN, OUT, MapPartitionFunction<IN, OUT>>(function, new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()), name);
// set input
po.setInput(input);
- // set dop
+ // set parallelism
if(this.getParallelism() > 0) {
- // use specified dop
- po.setDegreeOfParallelism(this.getParallelism());
+ // use specified parallelism
+ po.setParallelism(this.getParallelism());
} else {
- // if no dop has been specified, use dop of input operator to enable chaining
- po.setDegreeOfParallelism(input.getDegreeOfParallelism());
+ // if no parallelism has been specified, use parallelism of input operator to enable chaining
+ po.setParallelism(input.getParallelism());
}
return po;
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java
index 0f8a3eb..6d02749 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java
@@ -32,7 +32,7 @@ public abstract class Operator<OUT, O extends Operator<OUT, O>> extends DataSet<
protected String name;
- protected int dop = -1;
+ protected int parallelism = -1;
protected Operator(ExecutionEnvironment context, TypeInformation<OUT> resultType) {
super(context, resultType);
@@ -58,12 +58,12 @@ public abstract class Operator<OUT, O extends Operator<OUT, O>> extends DataSet<
}
/**
- * Returns the degree of parallelism of this operator.
+ * Returns the parallelism of this operator.
*
- * @return The degree of parallelism of this operator.
+ * @return The parallelism of this operator.
*/
public int getParallelism() {
- return this.dop;
+ return this.parallelism;
}
/**
@@ -82,17 +82,17 @@ public abstract class Operator<OUT, O extends Operator<OUT, O>> extends DataSet<
}
/**
- * Sets the degree of parallelism for this operator.
+ * Sets the parallelism for this operator.
* The degree must be 1 or more.
*
- * @param dop The degree of parallelism for this operator.
- * @return The operator with set degree of parallelism.
+ * @param parallelism The parallelism for this operator.
+ * @return The operator with set parallelism.
*/
- public O setParallelism(int dop) {
- if(dop < 1) {
+ public O setParallelism(int parallelism) {
+ if(parallelism < 1) {
throw new IllegalArgumentException("The parallelism of an operator must be at least 1.");
}
- this.dop = dop;
+ this.parallelism = parallelism;
@SuppressWarnings("unchecked")
O returnType = (O) this;
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java
index 68a216b..28c1c29 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java
@@ -218,7 +218,7 @@ public class OperatorTranslation {
iterationOperator.setMaximumNumberOfIterations(iterationEnd.getMaxIterations());
if (iterationHead.getParallelism() > 0) {
- iterationOperator.setDegreeOfParallelism(iterationHead.getParallelism());
+ iterationOperator.setParallelism(iterationHead.getParallelism());
}
DeltaIteration.SolutionSetPlaceHolder<D> solutionSetPlaceHolder = iterationHead.getSolutionSet();
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
index edb5a68..bf9c8e8 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
@@ -118,7 +118,7 @@ public class PartitionOperator<T> extends SingleInputOperator<T, T, PartitionOpe
PartitionOperatorBase<T> noop = new PartitionOperatorBase<T>(operatorInfo, pMethod, name);
noop.setInput(input);
- noop.setDegreeOfParallelism(getParallelism());
+ noop.setParallelism(getParallelism());
return noop;
}
@@ -131,7 +131,7 @@ public class PartitionOperator<T> extends SingleInputOperator<T, T, PartitionOpe
PartitionOperatorBase<T> noop = new PartitionOperatorBase<T>(operatorInfo, pMethod, logicalKeyPositions, name);
noop.setInput(input);
- noop.setDegreeOfParallelism(getParallelism());
+ noop.setParallelism(getParallelism());
noop.setCustomPartitioner(customPartitioner);
return noop;
@@ -177,10 +177,10 @@ public class PartitionOperator<T> extends SingleInputOperator<T, T, PartitionOpe
noop.setCustomPartitioner(customPartitioner);
- // set dop
- keyExtractingMap.setDegreeOfParallelism(input.getDegreeOfParallelism());
- noop.setDegreeOfParallelism(partitionDop);
- keyRemovingMap.setDegreeOfParallelism(partitionDop);
+ // set parallelism
+ keyExtractingMap.setParallelism(input.getParallelism());
+ noop.setParallelism(partitionDop);
+ keyRemovingMap.setParallelism(partitionDop);
return keyRemovingMap;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
index 16d9ff3..9b7d567 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
@@ -71,8 +71,8 @@ public class ProjectOperator<IN, OUT extends Tuple>
PlanProjectOperator<IN, OUT> ppo = new PlanProjectOperator<IN, OUT>(fields, name, getInputType(), getResultType(), context.getConfig());
// set input
ppo.setInput(input);
- // set dop
- ppo.setDegreeOfParallelism(this.getParallelism());
+ // set parallelism
+ ppo.setParallelism(this.getParallelism());
ppo.setSemanticProperties(SemanticPropUtil.createProjectionPropertiesSingle(fields, (CompositeType<?>) getInputType()));
return ppo;
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
index d1ad4c3..5951df8 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
@@ -90,8 +90,8 @@ public class ReduceOperator<IN> extends SingleInputUdfOperator<IN, IN, ReduceOpe
new ReduceOperatorBase<IN, ReduceFunction<IN>>(function, operatorInfo, new int[0], name);
po.setInput(input);
- // the degree of parallelism for a non grouped reduce can only be 1
- po.setDegreeOfParallelism(1);
+ // the parallelism for a non grouped reduce can only be 1
+ po.setParallelism(1);
return po;
}
@@ -118,7 +118,7 @@ public class ReduceOperator<IN> extends SingleInputUdfOperator<IN, IN, ReduceOpe
po.setCustomPartitioner(grouper.getCustomPartitioner());
po.setInput(input);
- po.setDegreeOfParallelism(getParallelism());
+ po.setParallelism(getParallelism());
return po;
}
@@ -130,7 +130,7 @@ public class ReduceOperator<IN> extends SingleInputUdfOperator<IN, IN, ReduceOpe
// --------------------------------------------------------------------------------------------
private static <T, K> MapOperatorBase<Tuple2<K, T>, T, ?> translateSelectorFunctionReducer(Keys.SelectorFunctionKeys<T, ?> rawKeys,
- ReduceFunction<T> function, TypeInformation<T> inputType, String name, Operator<T> input, int dop)
+ ReduceFunction<T> function, TypeInformation<T> inputType, String name, Operator<T> input, int parallelism)
{
@SuppressWarnings("unchecked")
final Keys.SelectorFunctionKeys<T, K> keys = (Keys.SelectorFunctionKeys<T, K>) rawKeys;
@@ -148,10 +148,10 @@ public class ReduceOperator<IN> extends SingleInputUdfOperator<IN, IN, ReduceOpe
reducer.setInput(keyExtractingMap);
keyRemovingMap.setInput(reducer);
- // set dop
- keyExtractingMap.setDegreeOfParallelism(input.getDegreeOfParallelism());
- reducer.setDegreeOfParallelism(dop);
- keyRemovingMap.setDegreeOfParallelism(dop);
+ // set parallelism
+ keyExtractingMap.setParallelism(input.getParallelism());
+ reducer.setParallelism(parallelism);
+ keyRemovingMap.setParallelism(parallelism);
return keyRemovingMap;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-java/src/test/java/org/apache/flink/api/java/io/TypeSerializerFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/TypeSerializerFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/TypeSerializerFormatTest.java
index f6e3c2a..35c564b 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/TypeSerializerFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/TypeSerializerFormatTest.java
@@ -47,8 +47,8 @@ public class TypeSerializerFormatTest extends SequentialFormatTestBase<Tuple2<In
private BlockInfo block;
- public TypeSerializerFormatTest(int numberOfTuples, long blockSize, int degreeOfParallelism) {
- super(numberOfTuples, blockSize, degreeOfParallelism);
+ public TypeSerializerFormatTest(int numberOfTuples, long blockSize, int parallelism) {
+ super(numberOfTuples, blockSize, parallelism);
resultType = TypeExtractor.getForObject(getRecord(0));
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/AggregateTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/AggregateTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/AggregateTranslationTest.java
index 08eefff..63b4052 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/AggregateTranslationTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/AggregateTranslationTest.java
@@ -39,8 +39,8 @@ public class AggregateTranslationTest {
@Test
public void translateAggregate() {
try {
- final int DOP = 8;
- ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(DOP);
+ final int parallelism = 8;
+ ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(parallelism);
@SuppressWarnings("unchecked")
DataSet<Tuple3<Double, StringValue, Long>> initialData =
@@ -58,7 +58,7 @@ public class AggregateTranslationTest {
assertEquals(1, reducer.getKeyColumns(0).length);
assertEquals(0, reducer.getKeyColumns(0)[0]);
- assertEquals(-1, reducer.getDegreeOfParallelism());
+ assertEquals(-1, reducer.getParallelism());
assertTrue(reducer.isCombinable());
assertTrue(reducer.getInput() instanceof GenericDataSourceBase<?, ?>);
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
index 55a2aff..ae89780 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
@@ -59,14 +59,14 @@ public class DeltaIterationTranslationTest implements java.io.Serializable {
final int[] ITERATION_KEYS = new int[] {2};
final int NUM_ITERATIONS = 13;
- final int DEFAULT_DOP= 133;
- final int ITERATION_DOP = 77;
+ final int DEFAULT_parallelism= 133;
+ final int ITERATION_parallelism = 77;
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// ------------ construct the test program ------------------
{
- env.setDegreeOfParallelism(DEFAULT_DOP);
+ env.setParallelism(DEFAULT_parallelism);
@SuppressWarnings("unchecked")
DataSet<Tuple3<Double, Long, String>> initialSolutionSet = env.fromElements(new Tuple3<Double, Long, String>(3.44, 5L, "abc"));
@@ -75,7 +75,7 @@ public class DeltaIterationTranslationTest implements java.io.Serializable {
DataSet<Tuple2<Double, String>> initialWorkSet = env.fromElements(new Tuple2<Double, String>(1.23, "abc"));
DeltaIteration<Tuple3<Double, Long, String>, Tuple2<Double, String>> iteration = initialSolutionSet.iterateDelta(initialWorkSet, NUM_ITERATIONS, ITERATION_KEYS);
- iteration.name(ITERATION_NAME).parallelism(ITERATION_DOP);
+ iteration.name(ITERATION_NAME).parallelism(ITERATION_parallelism);
iteration.registerAggregator(AGGREGATOR_NAME, new LongSumAggregator());
@@ -100,7 +100,7 @@ public class DeltaIterationTranslationTest implements java.io.Serializable {
// ------------- validate the plan ----------------
assertEquals(JOB_NAME, p.getJobName());
- assertEquals(DEFAULT_DOP, p.getDefaultParallelism());
+ assertEquals(DEFAULT_parallelism, p.getDefaultParallelism());
// validate the iteration
GenericDataSinkBase<?> sink1, sink2;
@@ -118,7 +118,7 @@ public class DeltaIterationTranslationTest implements java.io.Serializable {
// check the basic iteration properties
assertEquals(NUM_ITERATIONS, iteration.getMaximumNumberOfIterations());
assertArrayEquals(ITERATION_KEYS, iteration.getSolutionSetKeyFields());
- assertEquals(ITERATION_DOP, iteration.getDegreeOfParallelism());
+ assertEquals(ITERATION_parallelism, iteration.getParallelism());
assertEquals(ITERATION_NAME, iteration.getName());
MapOperatorBase<?, ?, ?> nextWorksetMapper = (MapOperatorBase<?, ?, ?>) iteration.getNextWorkset();
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java
index 0cf2ee2..b7fbb78 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java
@@ -76,8 +76,8 @@ public class DistinctTranslationTest {
@Test
public void translateDistinctPlain() {
try {
- final int DOP = 8;
- ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(DOP);
+ final int parallelism = 8;
+ ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(parallelism);
DataSet<Tuple3<Double, StringValue, LongValue>> initialData = getSourceDataSet(env);
@@ -97,8 +97,8 @@ public class DistinctTranslationTest {
// check keys
assertArrayEquals(new int[] {0, 1, 2}, reducer.getKeyColumns(0));
- // DOP was not configured on the operator
- assertTrue(reducer.getDegreeOfParallelism() == 1 || reducer.getDegreeOfParallelism() == -1);
+ // parallelism was not configured on the operator
+ assertTrue(reducer.getParallelism() == 1 || reducer.getParallelism() == -1);
assertTrue(reducer.getInput() instanceof GenericDataSourceBase<?, ?>);
}
@@ -112,8 +112,8 @@ public class DistinctTranslationTest {
@Test
public void translateDistinctPlain2() {
try {
- final int DOP = 8;
- ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(DOP);
+ final int parallelism = 8;
+ ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(parallelism);
DataSet<CustomType> initialData = getSourcePojoDataSet(env);
@@ -133,8 +133,8 @@ public class DistinctTranslationTest {
// check keys
assertArrayEquals(new int[] {0}, reducer.getKeyColumns(0));
- // DOP was not configured on the operator
- assertTrue(reducer.getDegreeOfParallelism() == 1 || reducer.getDegreeOfParallelism() == -1);
+ // parallelism was not configured on the operator
+ assertTrue(reducer.getParallelism() == 1 || reducer.getParallelism() == -1);
assertTrue(reducer.getInput() instanceof GenericDataSourceBase<?, ?>);
}
@@ -148,8 +148,8 @@ public class DistinctTranslationTest {
@Test
public void translateDistinctPosition() {
try {
- final int DOP = 8;
- ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(DOP);
+ final int parallelism = 8;
+ ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(parallelism);
DataSet<Tuple3<Double, StringValue, LongValue>> initialData = getSourceDataSet(env);
@@ -169,8 +169,8 @@ public class DistinctTranslationTest {
// check keys
assertArrayEquals(new int[] {1, 2}, reducer.getKeyColumns(0));
- // DOP was not configured on the operator
- assertTrue(reducer.getDegreeOfParallelism() == 1 || reducer.getDegreeOfParallelism() == -1);
+ // parallelism was not configured on the operator
+ assertTrue(reducer.getParallelism() == 1 || reducer.getParallelism() == -1);
assertTrue(reducer.getInput() instanceof GenericDataSourceBase<?, ?>);
}
@@ -184,8 +184,8 @@ public class DistinctTranslationTest {
@Test
public void translateDistinctKeySelector() {
try {
- final int DOP = 8;
- ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(DOP);
+ final int parallelism = 8;
+ ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(parallelism);
DataSet<Tuple3<Double, StringValue, LongValue>> initialData = getSourceDataSet(env);
@@ -202,9 +202,9 @@ public class DistinctTranslationTest {
PlanUnwrappingReduceGroupOperator<?, ?, ?> reducer = (PlanUnwrappingReduceGroupOperator<?, ?, ?>) sink.getInput();
MapOperatorBase<?, ?, ?> keyExtractor = (MapOperatorBase<?, ?, ?>) reducer.getInput();
- // check the DOPs
- assertEquals(1, keyExtractor.getDegreeOfParallelism());
- assertEquals(4, reducer.getDegreeOfParallelism());
+ // check the parallelisms
+ assertEquals(1, keyExtractor.getParallelism());
+ assertEquals(4, reducer.getParallelism());
// check types
TypeInformation<?> keyValueInfo = new TupleTypeInfo<Tuple2<StringValue, Tuple3<Double,StringValue,LongValue>>>(
@@ -232,8 +232,8 @@ public class DistinctTranslationTest {
@Test
public void translateDistinctExpressionKey() {
try {
- final int DOP = 8;
- ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(DOP);
+ final int parallelism = 8;
+ ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(parallelism);
DataSet<CustomType> initialData = getSourcePojoDataSet(env);
@@ -253,8 +253,8 @@ public class DistinctTranslationTest {
// check keys
assertArrayEquals(new int[] {0}, reducer.getKeyColumns(0));
- // DOP was not configured on the operator
- assertTrue(reducer.getDegreeOfParallelism() == 1 || reducer.getDegreeOfParallelism() == -1);
+ // parallelism was not configured on the operator
+ assertTrue(reducer.getParallelism() == 1 || reducer.getParallelism() == -1);
assertTrue(reducer.getInput() instanceof GenericDataSourceBase<?, ?>);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java
index f9253f8..b578eb7 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java
@@ -45,8 +45,8 @@ public class ReduceTranslationTests implements java.io.Serializable {
@Test
public void translateNonGroupedReduce() {
try {
- final int DOP = 8;
- ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(DOP);
+ final int parallelism = 8;
+ ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(parallelism);
DataSet<Tuple3<Double, StringValue, LongValue>> initialData = getSourceDataSet(env);
@@ -69,8 +69,8 @@ public class ReduceTranslationTests implements java.io.Serializable {
// check keys
assertTrue(reducer.getKeyColumns(0) == null || reducer.getKeyColumns(0).length == 0);
- // DOP was not configured on the operator
- assertTrue(reducer.getDegreeOfParallelism() == 1 || reducer.getDegreeOfParallelism() == -1);
+ // parallelism was not configured on the operator
+ assertTrue(reducer.getParallelism() == 1 || reducer.getParallelism() == -1);
assertTrue(reducer.getInput() instanceof GenericDataSourceBase<?, ?>);
}
@@ -84,8 +84,8 @@ public class ReduceTranslationTests implements java.io.Serializable {
@Test
public void translateGroupedReduceNoMapper() {
try {
- final int DOP = 8;
- ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(DOP);
+ final int parallelism = 8;
+ ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(parallelism);
DataSet<Tuple3<Double, StringValue, LongValue>> initialData = getSourceDataSet(env);
@@ -108,8 +108,8 @@ public class ReduceTranslationTests implements java.io.Serializable {
assertEquals(initialData.getType(), reducer.getOperatorInfo().getInputType());
assertEquals(initialData.getType(), reducer.getOperatorInfo().getOutputType());
- // DOP was not configured on the operator
- assertTrue(reducer.getDegreeOfParallelism() == DOP || reducer.getDegreeOfParallelism() == -1);
+ // parallelism was not configured on the operator
+ assertTrue(reducer.getParallelism() == parallelism || reducer.getParallelism() == -1);
// check keys
assertArrayEquals(new int[] {2}, reducer.getKeyColumns(0));
@@ -127,8 +127,8 @@ public class ReduceTranslationTests implements java.io.Serializable {
@Test
public void translateGroupedReduceWithkeyExtractor() {
try {
- final int DOP = 8;
- ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(DOP);
+ final int parallelism = 8;
+ ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(parallelism);
DataSet<Tuple3<Double, StringValue, LongValue>> initialData = getSourceDataSet(env);
@@ -154,10 +154,10 @@ public class ReduceTranslationTests implements java.io.Serializable {
PlanUnwrappingReduceOperator<?, ?> reducer = (PlanUnwrappingReduceOperator<?, ?>) keyProjector.getInput();
MapOperatorBase<?, ?, ?> keyExtractor = (MapOperatorBase<?, ?, ?>) reducer.getInput();
- // check the DOPs
- assertEquals(1, keyExtractor.getDegreeOfParallelism());
- assertEquals(4, reducer.getDegreeOfParallelism());
- assertEquals(4, keyProjector.getDegreeOfParallelism());
+ // check the parallelisms
+ assertEquals(1, keyExtractor.getParallelism());
+ assertEquals(4, reducer.getParallelism());
+ assertEquals(4, keyProjector.getParallelism());
// check types
TypeInformation<?> keyValueInfo = new TupleTypeInfo<Tuple2<StringValue, Tuple3<Double,StringValue,LongValue>>>(
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/main/java/org/apache/flink/optimizer/Optimizer.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/Optimizer.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/Optimizer.java
index 2101428..90421b7 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/Optimizer.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/Optimizer.java
@@ -288,9 +288,9 @@ public class Optimizer {
private final CostEstimator costEstimator;
/**
- * The default degree of parallelism for jobs compiled by this compiler.
+ * The default parallelism for jobs compiled by this compiler.
*/
- private int defaultDegreeOfParallelism;
+ private int defaultParallelism;
// ------------------------------------------------------------------------
@@ -348,14 +348,14 @@ public class Optimizer {
this.costEstimator = estimator;
// determine the default parallelism
- this.defaultDegreeOfParallelism = GlobalConfiguration.getInteger(
- ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE_KEY,
- ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE);
+ this.defaultParallelism = GlobalConfiguration.getInteger(
+ ConfigConstants.DEFAULT_PARALLELISM_KEY,
+ ConfigConstants.DEFAULT_PARALLELISM);
- if (defaultDegreeOfParallelism < 1) {
- LOG.warn("Config value " + defaultDegreeOfParallelism + " for option "
- + ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE + " is invalid. Ignoring and using a value of 1.");
- this.defaultDegreeOfParallelism = 1;
+ if (defaultParallelism < 1) {
+ LOG.warn("Config value " + defaultParallelism + " for option "
+ + ConfigConstants.DEFAULT_PARALLELISM + " is invalid. Ignoring and using a value of 1.");
+ this.defaultParallelism = 1;
}
}
@@ -363,13 +363,13 @@ public class Optimizer {
// Getters / Setters
// ------------------------------------------------------------------------
- public int getDefaultDegreeOfParallelism() {
- return defaultDegreeOfParallelism;
+ public int getDefaultParallelism() {
+ return defaultParallelism;
}
- public void setDefaultDegreeOfParallelism(int defaultDegreeOfParallelism) {
- if (defaultDegreeOfParallelism > 0) {
- this.defaultDegreeOfParallelism = defaultDegreeOfParallelism;
+ public void setDefaultParallelism(int defaultParallelism) {
+ if (defaultParallelism > 0) {
+ this.defaultParallelism = defaultParallelism;
} else {
throw new IllegalArgumentException("Default parallelism cannot be zero or negative.");
}
@@ -435,7 +435,7 @@ public class Optimizer {
final ExecutionMode defaultDataExchangeMode = program.getExecutionConfig().getExecutionMode();
final int defaultParallelism = program.getDefaultParallelism() > 0 ?
- program.getDefaultParallelism() : this.defaultDegreeOfParallelism;
+ program.getDefaultParallelism() : this.defaultParallelism;
// log the default settings
LOG.debug("Using a default parallelism of {}", defaultParallelism);
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java
index 068799e..1600a50 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java
@@ -122,12 +122,12 @@ public class BinaryUnionNode extends TwoInputNode {
final ExecutionMode input1Mode = this.input1.getDataExchangeMode();
final ExecutionMode input2Mode = this.input2.getDataExchangeMode();
- final int dop = getParallelism();
- final int inDop1 = getFirstPredecessorNode().getParallelism();
- final int inDop2 = getSecondPredecessorNode().getParallelism();
+ final int parallelism = getParallelism();
+ final int inParallelism1 = getFirstPredecessorNode().getParallelism();
+ final int inParallelism2 = getSecondPredecessorNode().getParallelism();
- final boolean dopChange1 = dop != inDop1;
- final boolean dopChange2 = dop != inDop2;
+ final boolean dopChange1 = parallelism != inParallelism1;
+ final boolean dopChange2 = parallelism != inParallelism2;
final boolean input1breakPipeline = this.input1.isBreakingPipeline();
final boolean input2breakPipeline = this.input2.isBreakingPipeline();
@@ -152,8 +152,8 @@ public class BinaryUnionNode extends TwoInputNode {
// free to choose the ship strategy
igps.parameterizeChannel(c1, dopChange1, input1Mode, input1breakPipeline);
- // if the DOP changed, make sure that we cancel out properties, unless the
- // ship strategy preserves/establishes them even under changing DOPs
+ // if the parallelism changed, make sure that we cancel out properties, unless the
+ // ship strategy preserves/establishes them even under changing parallelisms
if (dopChange1 && !c1.getShipStrategy().isNetworkStrategy()) {
c1.getGlobalProperties().reset();
}
@@ -179,8 +179,8 @@ public class BinaryUnionNode extends TwoInputNode {
// free to choose the ship strategy
igps.parameterizeChannel(c2, dopChange2, input2Mode, input2breakPipeline);
- // if the DOP changed, make sure that we cancel out properties, unless the
- // ship strategy preserves/establishes them even under changing DOPs
+ // if the parallelism changed, make sure that we cancel out properties, unless the
+ // ship strategy preserves/establishes them even under changing parallelisms
if (dopChange2 && !c2.getShipStrategy().isNetworkStrategy()) {
c2.getGlobalProperties().reset();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BulkIterationNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BulkIterationNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BulkIterationNode.java
index 55b8785..5dd868e 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BulkIterationNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BulkIterationNode.java
@@ -131,14 +131,14 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode
*/
public void setNextPartialSolution(OptimizerNode nextPartialSolution, OptimizerNode terminationCriterion) {
- // check if the root of the step function has the same DOP as the iteration
+ // check if the root of the step function has the same parallelism as the iteration
// or if the step function has any operator at all
if (nextPartialSolution.getParallelism() != getParallelism() ||
nextPartialSolution == partialSolution || nextPartialSolution instanceof BinaryUnionNode)
{
// add a no-op to the root to express the re-partitioning
NoOpNode noop = new NoOpNode();
- noop.setDegreeOfParallelism(getParallelism());
+ noop.setParallelism(getParallelism());
DagConnection noOpConn = new DagConnection(nextPartialSolution, noop, ExecutionMode.PIPELINED);
noop.setIncomingConnection(noOpConn);
@@ -323,7 +323,7 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode
locPropsReq.parameterizeChannel(toNoOp);
UnaryOperatorNode rebuildPropertiesNode = new UnaryOperatorNode("Rebuild Partial Solution Properties", FieldList.EMPTY_LIST);
- rebuildPropertiesNode.setDegreeOfParallelism(candidate.getParallelism());
+ rebuildPropertiesNode.setParallelism(candidate.getParallelism());
SingleInputPlanNode rebuildPropertiesPlanNode = new SingleInputPlanNode(rebuildPropertiesNode, "Rebuild Partial Solution Properties", toNoOp, DriverStrategy.UNARY_NO_OP);
rebuildPropertiesPlanNode.initProperties(toNoOp.getGlobalProperties(), toNoOp.getLocalProperties());
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DataSinkNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DataSinkNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DataSinkNode.java
index dbe04f4..6ca1149 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DataSinkNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DataSinkNode.java
@@ -204,11 +204,11 @@ public class DataSinkNode extends OptimizerNode {
List<? extends PlanNode> subPlans = getPredecessorNode().getAlternativePlans(estimator);
List<PlanNode> outputPlans = new ArrayList<PlanNode>();
- final int dop = getParallelism();
+ final int parallelism = getParallelism();
final int inDop = getPredecessorNode().getParallelism();
final ExecutionMode executionMode = this.input.getDataExchangeMode();
- final boolean dopChange = dop != inDop;
+ final boolean dopChange = parallelism != inDop;
final boolean breakPipeline = this.input.isBreakingPipeline();
InterestingProperties ips = this.input.getInterestingProperties();
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DataSourceNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DataSourceNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DataSourceNode.java
index e4b35b7..6010f6a 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DataSourceNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DataSourceNode.java
@@ -75,7 +75,7 @@ public class DataSourceNode extends OptimizerNode {
}
if (NonParallelInput.class.isAssignableFrom(pactContract.getUserCodeWrapper().getUserCodeClass())) {
- setDegreeOfParallelism(1);
+ setParallelism(1);
this.sequentialInput = true;
} else {
this.sequentialInput = false;
@@ -115,10 +115,10 @@ public class DataSourceNode extends OptimizerNode {
}
@Override
- public void setDegreeOfParallelism(int degreeOfParallelism) {
+ public void setParallelism(int parallelism) {
// if unsplittable, parallelism remains at 1
if (!this.sequentialInput) {
- super.setDegreeOfParallelism(degreeOfParallelism);
+ super.setParallelism(parallelism);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupCombineNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupCombineNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupCombineNode.java
index 564c0d3..d25fed9 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupCombineNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupCombineNode.java
@@ -45,7 +45,7 @@ public class GroupCombineNode extends SingleInputNode {
if (this.keys == null) {
// case of a key-less reducer. force a parallelism of 1
- setDegreeOfParallelism(1);
+ setParallelism(1);
}
this.possibleProperties = initPossibleProperties();
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java
index 77acae5..227b75f 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java
@@ -53,7 +53,7 @@ public class GroupReduceNode extends SingleInputNode {
if (this.keys == null) {
// case of a key-less reducer. force a parallelism of 1
- setDegreeOfParallelism(1);
+ setParallelism(1);
}
this.possibleProperties = initPossibleProperties(operator.getCustomPartitioner());
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OptimizerNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OptimizerNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OptimizerNode.java
index 0cad34e..6bf43ea 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OptimizerNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OptimizerNode.java
@@ -99,7 +99,7 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
// --------------------------------- General Parameters ---------------------------------------
private int parallelism = -1; // the number of parallel instances of this node
-
+
private long minimalMemoryPerSubTask = -1;
protected int id = -1; // the id for this node.
@@ -390,9 +390,9 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
* @param parallelism The parallelism to set.
* @throws IllegalArgumentException If the parallelism is smaller than one and not -1.
*/
- public void setDegreeOfParallelism(int parallelism) {
+ public void setParallelism(int parallelism) {
if (parallelism < 1 && parallelism != -1) {
- throw new IllegalArgumentException("Degree of parallelism of " + parallelism + " is invalid.");
+ throw new IllegalArgumentException("Parallelism of " + parallelism + " is invalid.");
}
this.parallelism = parallelism;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java
index 1477038..52bfb6a 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java
@@ -43,7 +43,7 @@ public class ReduceNode extends SingleInputNode {
if (this.keys == null) {
// case of a key-less reducer. force a parallelism of 1
- setDegreeOfParallelism(1);
+ setParallelism(1);
}
OperatorDescriptorSingle props = this.keys == null ?
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SingleInputNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SingleInputNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SingleInputNode.java
index cc12bb8..e9b31f4 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SingleInputNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SingleInputNode.java
@@ -283,9 +283,10 @@ public abstract class SingleInputNode extends OptimizerNode {
final ExecutionMode executionMode = this.inConn.getDataExchangeMode();
- final int dop = getParallelism();
- final int inDop = getPredecessorNode().getParallelism();
- final boolean dopChange = inDop != dop;
+ final int parallelism = getParallelism();
+ final int inParallelism = getPredecessorNode().getParallelism();
+
+ final boolean parallelismChange = inParallelism != parallelism;
final boolean breaksPipeline = this.inConn.isBreakingPipeline();
@@ -293,8 +294,8 @@ public abstract class SingleInputNode extends OptimizerNode {
for (PlanNode child : subPlans) {
if (child.getGlobalProperties().isFullyReplicated()) {
- // fully replicated input is always locally forwarded if DOP is not changed
- if (dopChange) {
+ // fully replicated input is always locally forwarded if the parallelism is not changed
+ if (parallelismChange) {
// can not continue with this child
childrenSkippedDueToReplicatedInput = true;
continue;
@@ -307,11 +308,11 @@ public abstract class SingleInputNode extends OptimizerNode {
// pick the strategy ourselves
for (RequestedGlobalProperties igps: intGlobal) {
final Channel c = new Channel(child, this.inConn.getMaterializationMode());
- igps.parameterizeChannel(c, dopChange, executionMode, breaksPipeline);
+ igps.parameterizeChannel(c, parallelismChange, executionMode, breaksPipeline);
- // if the DOP changed, make sure that we cancel out properties, unless the
- // ship strategy preserves/establishes them even under changing DOPs
- if (dopChange && !c.getShipStrategy().isNetworkStrategy()) {
+ // if the parallelism changed, make sure that we cancel out properties, unless the
+ // ship strategy preserves/establishes them even under changing parallelisms
+ if (parallelismChange && !c.getShipStrategy().isNetworkStrategy()) {
c.getGlobalProperties().reset();
}
@@ -339,7 +340,7 @@ public abstract class SingleInputNode extends OptimizerNode {
c.setShipStrategy(shipStrategy, exMode);
}
- if (dopChange) {
+ if (parallelismChange) {
c.adjustGlobalPropertiesForFullParallelismChange();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SinkJoiner.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SinkJoiner.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SinkJoiner.java
index 40725ba..06606f0 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SinkJoiner.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SinkJoiner.java
@@ -47,7 +47,7 @@ public class SinkJoiner extends TwoInputNode {
this.input1 = conn1;
this.input2 = conn2;
- setDegreeOfParallelism(1);
+ setParallelism(1);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java
index 39da165..f3122ba 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java
@@ -352,12 +352,12 @@ public abstract class TwoInputNode extends OptimizerNode {
final ExecutionMode input1Mode = this.input1.getDataExchangeMode();
final ExecutionMode input2Mode = this.input2.getDataExchangeMode();
- final int dop = getParallelism();
- final int inDop1 = getFirstPredecessorNode().getParallelism();
- final int inDop2 = getSecondPredecessorNode().getParallelism();
+ final int parallelism = getParallelism();
+ final int inParallelism1 = getFirstPredecessorNode().getParallelism();
+ final int inParallelism2 = getSecondPredecessorNode().getParallelism();
- final boolean dopChange1 = dop != inDop1;
- final boolean dopChange2 = dop != inDop2;
+ final boolean dopChange1 = parallelism != inParallelism1;
+ final boolean dopChange2 = parallelism != inParallelism2;
final boolean input1breaksPipeline = this.input1.isBreakingPipeline();
final boolean input2breaksPipeline = this.input2.isBreakingPipeline();
@@ -369,7 +369,7 @@ public abstract class TwoInputNode extends OptimizerNode {
for (PlanNode child1 : subPlans1) {
if (child1.getGlobalProperties().isFullyReplicated()) {
- // fully replicated input is always locally forwarded if DOP is not changed
+ // fully replicated input is always locally forwarded if parallelism is not changed
if (dopChange1) {
// can not continue with this child
childrenSkippedDueToReplicatedInput = true;
@@ -382,7 +382,7 @@ public abstract class TwoInputNode extends OptimizerNode {
for (PlanNode child2 : subPlans2) {
if (child2.getGlobalProperties().isFullyReplicated()) {
- // fully replicated input is always locally forwarded if DOP is not changed
+ // fully replicated input is always locally forwarded if parallelism is not changed
if (dopChange2) {
// can not continue with this child
childrenSkippedDueToReplicatedInput = true;
@@ -405,8 +405,8 @@ public abstract class TwoInputNode extends OptimizerNode {
// free to choose the ship strategy
igps1.parameterizeChannel(c1, dopChange1, input1Mode, input1breaksPipeline);
- // if the DOP changed, make sure that we cancel out properties, unless the
- // ship strategy preserves/establishes them even under changing DOPs
+ // if the parallelism changed, make sure that we cancel out properties, unless the
+ // ship strategy preserves/establishes them even under changing parallelisms
if (dopChange1 && !c1.getShipStrategy().isNetworkStrategy()) {
c1.getGlobalProperties().reset();
}
@@ -434,8 +434,8 @@ public abstract class TwoInputNode extends OptimizerNode {
// free to choose the ship strategy
igps2.parameterizeChannel(c2, dopChange2, input2Mode, input2breaksPipeline);
- // if the DOP changed, make sure that we cancel out properties, unless the
- // ship strategy preserves/establishes them even under changing DOPs
+ // if the parallelism changed, make sure that we cancel out properties, unless the
+ // ship strategy preserves/establishes them even under changing parallelisms
if (dopChange2 && !c2.getShipStrategy().isNetworkStrategy()) {
c2.getGlobalProperties().reset();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java
index e85f289..99c868c 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java
@@ -167,7 +167,7 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode
// if the next workset is equal to the workset, we need to inject a no-op node
if (nextWorkset == worksetNode || nextWorkset instanceof BinaryUnionNode) {
NoOpNode noop = new NoOpNode();
- noop.setDegreeOfParallelism(getParallelism());
+ noop.setParallelism(getParallelism());
DagConnection noOpConn = new DagConnection(nextWorkset, noop, executionMode);
noop.setIncomingConnection(noOpConn);
@@ -179,7 +179,7 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode
// attach an extra node to the solution set delta for the cases where we need to repartition
UnaryOperatorNode solutionSetDeltaUpdateAux = new UnaryOperatorNode("Solution-Set Delta", getSolutionSetKeyFields(),
new SolutionSetDeltaOperator(getSolutionSetKeyFields()));
- solutionSetDeltaUpdateAux.setDegreeOfParallelism(getParallelism());
+ solutionSetDeltaUpdateAux.setParallelism(getParallelism());
DagConnection conn = new DagConnection(solutionSetDelta, solutionSetDeltaUpdateAux, executionMode);
solutionSetDeltaUpdateAux.setIncomingConnection(conn);
@@ -371,7 +371,7 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode
UnaryOperatorNode rebuildWorksetPropertiesNode = new UnaryOperatorNode("Rebuild Workset Properties",
FieldList.EMPTY_LIST);
- rebuildWorksetPropertiesNode.setDegreeOfParallelism(candidate.getParallelism());
+ rebuildWorksetPropertiesNode.setParallelism(candidate.getParallelism());
SingleInputPlanNode rebuildWorksetPropertiesPlanNode = new SingleInputPlanNode(
rebuildWorksetPropertiesNode, "Rebuild Workset Properties",
@@ -563,7 +563,7 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode
SingleRootJoiner() {
super(new NoOpBinaryUdfOp<Nothing>(new NothingTypeInfo()));
- setDegreeOfParallelism(1);
+ setParallelism(1);
}
public void setInputs(DagConnection input1, DagConnection input2) {
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/RequestedGlobalProperties.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/RequestedGlobalProperties.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/RequestedGlobalProperties.java
index 8c3f6bd..3646d74 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/RequestedGlobalProperties.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/RequestedGlobalProperties.java
@@ -340,7 +340,7 @@ public final class RequestedGlobalProperties implements Cloneable {
* the desired global properties.
*
* @param channel The channel to parametrize.
- * @param globalDopChange Flag indicating whether the degree of parallelism changes
+ * @param globalDopChange Flag indicating whether the parallelism changes
* between sender and receiver.
* @param exchangeMode The mode of data exchange (pipelined, always batch,
* batch only on shuffle, ...)
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/AllGroupWithPartialPreGroupProperties.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/AllGroupWithPartialPreGroupProperties.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/AllGroupWithPartialPreGroupProperties.java
index b3c083a..4990a5d 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/AllGroupWithPartialPreGroupProperties.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/AllGroupWithPartialPreGroupProperties.java
@@ -53,9 +53,9 @@ public final class AllGroupWithPartialPreGroupProperties extends OperatorDescrip
Channel toCombiner = new Channel(in.getSource());
toCombiner.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
- // create an input node for combine with same DOP as input node
+ // create an input node for combine with same parallelism as input node
GroupReduceNode combinerNode = ((GroupReduceNode) node).getCombinerUtilityNode();
- combinerNode.setDegreeOfParallelism(in.getSource().getParallelism());
+ combinerNode.setParallelism(in.getSource().getParallelism());
SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode,
"Combine ("+node.getOperator().getName()+")", toCombiner, DriverStrategy.ALL_GROUP_REDUCE_COMBINE);
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/AllReduceProperties.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/AllReduceProperties.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/AllReduceProperties.java
index a172a60..bd600e4 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/AllReduceProperties.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/AllReduceProperties.java
@@ -52,9 +52,9 @@ public final class AllReduceProperties extends OperatorDescriptorSingle {
Channel toCombiner = new Channel(in.getSource());
toCombiner.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
- // create an input node for combine with same DOP as input node
+ // create an input node for combine with same parallelism as input node
ReduceNode combinerNode = ((ReduceNode) node).getCombinerUtilityNode();
- combinerNode.setDegreeOfParallelism(in.getSource().getParallelism());
+ combinerNode.setParallelism(in.getSource().getParallelism());
SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode,
"Combine ("+node.getOperator().getName()+")", toCombiner, DriverStrategy.ALL_REDUCE);
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupCombineProperties.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupCombineProperties.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupCombineProperties.java
index b648386..64054a2 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupCombineProperties.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupCombineProperties.java
@@ -71,7 +71,7 @@ public final class GroupCombineProperties extends OperatorDescriptorSingle {
@Override
public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
- node.setDegreeOfParallelism(in.getSource().getParallelism());
+ node.setParallelism(in.getSource().getParallelism());
// sorting key info
SingleInputPlanNode singleInputPlanNode = new SingleInputPlanNode(
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
index c4f47d3..86863d2 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
@@ -105,9 +105,9 @@ public final class GroupReduceWithCombineProperties extends OperatorDescriptorSi
Channel toCombiner = new Channel(in.getSource());
toCombiner.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
- // create an input node for combine with same DOP as input node
+ // create an input node for combine with same parallelism as input node
GroupReduceNode combinerNode = ((GroupReduceNode) node).getCombinerUtilityNode();
- combinerNode.setDegreeOfParallelism(in.getSource().getParallelism());
+ combinerNode.setParallelism(in.getSource().getParallelism());
SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine("+node.getOperator()
.getName()+")", toCombiner, DriverStrategy.SORTED_GROUP_COMBINE);
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/PartialGroupProperties.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/PartialGroupProperties.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/PartialGroupProperties.java
index 2bde29b..e4e6a7f 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/PartialGroupProperties.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/PartialGroupProperties.java
@@ -47,9 +47,9 @@ public final class PartialGroupProperties extends OperatorDescriptorSingle {
@Override
public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
- // create in input node for combine with same DOP as input node
+ // create in input node for combine with the same parallelism as input node
GroupReduceNode combinerNode = new GroupReduceNode((GroupReduceOperatorBase<?, ?, ?>) node.getOperator());
- combinerNode.setDegreeOfParallelism(in.getSource().getParallelism());
+ combinerNode.setParallelism(in.getSource().getParallelism());
SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine("+node.getOperator().getName()+")", in,
DriverStrategy.SORTED_GROUP_COMBINE);
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java
index 5bb51f3..81afe1e 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java
@@ -69,9 +69,9 @@ public final class ReduceProperties extends OperatorDescriptorSingle {
Channel toCombiner = new Channel(in.getSource());
toCombiner.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
- // create an input node for combine with same DOP as input node
+ // create an input node for combine with same parallelism as input node
ReduceNode combinerNode = ((ReduceNode) node).getCombinerUtilityNode();
- combinerNode.setDegreeOfParallelism(in.getSource().getParallelism());
+ combinerNode.setParallelism(in.getSource().getParallelism());
SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode,
"Combine ("+node.getOperator().getName()+")", toCombiner,
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/Channel.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/Channel.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/Channel.java
index 875d1c3..4f8b1be 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/Channel.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/Channel.java
@@ -473,7 +473,7 @@ public class Channel implements EstimateProvider, Cloneable, DumpableConnection<
public void adjustGlobalPropertiesForFullParallelismChange() {
if (this.shipStrategy == null || this.shipStrategy == ShipStrategyType.NONE) {
- throw new IllegalStateException("Cannot adjust channel for degree of parallelism " +
+ throw new IllegalStateException("Cannot adjust channel for parallelism " +
"change before the ship strategy is set.");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SinkJoinerPlanNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SinkJoinerPlanNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SinkJoinerPlanNode.java
index 451484d..c93d8c2 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SinkJoinerPlanNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SinkJoinerPlanNode.java
@@ -40,7 +40,7 @@ public class SinkJoinerPlanNode extends DualInputPlanNode {
public void setCosts(Costs nodeCosts) {
// the plan enumeration logic works as for regular two-input-operators, which is important
// because of the branch handling logic. it does pick redistributing network channels
- // between the sink and the sink joiner, because sinks joiner has a different DOP than the sink.
+ // between the sink and the sink joiner, because sinks joiner has a different parallelism than the sink.
// we discard any cost and simply use the sum of the costs from the two children.
Costs totalCosts = getInput1().getSource().getCumulativeCosts().clone();
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
index 6f918c0..b04cdd8 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
@@ -275,7 +275,7 @@ public class PlanJSONDumpGenerator {
// output node contents
writer.print(",\n\t\t\"contents\": \"" + contents + "\"");
- // degree of parallelism
+ // parallelism
writer.print(",\n\t\t\"parallelism\": \""
+ (n.getParallelism() >= 1 ? n.getParallelism() : "default") + "\"");
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
index 04bc527..dc21c13 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
@@ -264,15 +264,15 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
// for the bulk iteration, we skip creating anything for now. we create the graph
// for the step function in the post visit.
- // check that the root of the step function has the same DOP as the iteration.
- // because the tail must have the same DOP as the head, we can only merge the last
- // operator with the tail, if they have the same DOP. not merging is currently not
+ // check that the root of the step function has the same parallelism as the iteration.
+ // because the tail must have the same parallelism as the head, we can only merge the last
+ // operator with the tail, if they have the same parallelism. not merging is currently not
// implemented
PlanNode root = iterationNode.getRootOfStepFunction();
if (root.getParallelism() != node.getParallelism())
{
throw new CompilerException("Error: The final operator of the step " +
- "function has a different degree of parallelism than the iteration operator itself.");
+ "function has a different parallelism than the iteration operator itself.");
}
IterationDescriptor descr = new IterationDescriptor(iterationNode, this.iterationIdEnumerator++);
@@ -289,12 +289,12 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
if (nextWorkSet.getParallelism() != node.getParallelism())
{
throw new CompilerException("It is currently not supported that the final operator of the step " +
- "function has a different degree of parallelism than the iteration operator itself.");
+ "function has a different parallelism than the iteration operator itself.");
}
if (solutionSetDelta.getParallelism() != node.getParallelism())
{
throw new CompilerException("It is currently not supported that the final operator of the step " +
- "function has a different degree of parallelism than the iteration operator itself.");
+ "function has a different parallelism than the iteration operator itself.");
}
IterationDescriptor descr = new IterationDescriptor(iterationNode, this.iterationIdEnumerator++);
@@ -362,7 +362,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
// check if a vertex was created, or if it was chained or skipped
if (vertex != null) {
- // set degree of parallelism
+ // set parallelism
int pd = node.getParallelism();
vertex.setParallelism(pd);
@@ -370,10 +370,10 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
// check whether this vertex is part of an iteration step function
if (this.currentIteration != null) {
- // check that the task has the same DOP as the iteration as such
+ // check that the task has the same parallelism as the iteration as such
PlanNode iterationNode = (PlanNode) this.currentIteration;
if (iterationNode.getParallelism() < pd) {
- throw new CompilerException("Error: All functions that are part of an iteration must have the same, or a lower, degree-of-parallelism than the iteration operator.");
+ throw new CompilerException("Error: All functions that are part of an iteration must have the same, or a lower, parallelism than the iteration operator.");
}
// store the id of the iterations the step functions participate in
@@ -725,7 +725,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
return 1;
}
else {
- throw new CompilerException("Error: A changing degree of parallelism is currently " +
+ throw new CompilerException("Error: A changing parallelism is currently " +
"not supported between tasks within an iteration.");
}
} else {
@@ -880,7 +880,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
// the step function, if
// 1) There is one parent that the partial solution connects to via a forward pattern and no
// local strategy
- // 2) DOP and the number of subtasks per instance does not change
+ // 2) parallelism and the number of subtasks per instance does not change
// 3) That successor is not a union
// 4) That successor is not itself the last node of the step function
// 5) There is no local strategy on the edge for the initial partial solution, as
@@ -948,7 +948,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
// the step function, if
// 1) There is one parent that the partial solution connects to via a forward pattern and no
// local strategy
- // 2) DOP and the number of subtasks per instance does not change
+ // 2) parallelism and the number of subtasks per instance does not change
// 3) That successor is not a union
// 4) That successor is not itself the last node of the step function
// 5) There is no local strategy on the edge for the initial workset, as
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/GraphCreatingVisitor.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/GraphCreatingVisitor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/GraphCreatingVisitor.java
index 160ef95..37cffce 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/GraphCreatingVisitor.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/GraphCreatingVisitor.java
@@ -83,20 +83,20 @@ public class GraphCreatingVisitor implements Visitor<Operator<?>> {
private final List<DataSinkNode> sinks; // all data sink nodes in the optimizer plan
- private final int defaultParallelism; // the default degree of parallelism
+ private final int defaultParallelism; // the default parallelism
private final GraphCreatingVisitor parent; // reference to enclosing creator, in case of a recursive translation
private final ExecutionMode defaultDataExchangeMode;
- private final boolean forceDOP;
+ private final boolean forceParallelism;
public GraphCreatingVisitor(int defaultParallelism, ExecutionMode defaultDataExchangeMode) {
this(null, false, defaultParallelism, defaultDataExchangeMode, null);
}
- private GraphCreatingVisitor(GraphCreatingVisitor parent, boolean forceDOP, int defaultParallelism,
+ private GraphCreatingVisitor(GraphCreatingVisitor parent, boolean forceParallelism, int defaultParallelism,
ExecutionMode dataExchangeMode, HashMap<Operator<?>, OptimizerNode> closure) {
if (closure == null){
con2node = new HashMap<Operator<?>, OptimizerNode>();
@@ -108,7 +108,7 @@ public class GraphCreatingVisitor implements Visitor<Operator<?>> {
this.defaultParallelism = defaultParallelism;
this.parent = parent;
this.defaultDataExchangeMode = dataExchangeMode;
- this.forceDOP = forceDOP;
+ this.forceParallelism = forceParallelism;
}
public List<DataSinkNode> getSinks() {
@@ -194,7 +194,7 @@ public class GraphCreatingVisitor implements Visitor<Operator<?>> {
// catch this for the recursive translation of step functions
BulkPartialSolutionNode p = new BulkPartialSolutionNode(holder, containingIterationNode);
- p.setDegreeOfParallelism(containingIterationNode.getParallelism());
+ p.setParallelism(containingIterationNode.getParallelism());
n = p;
}
else if (c instanceof DeltaIterationBase.WorksetPlaceHolder) {
@@ -209,7 +209,7 @@ public class GraphCreatingVisitor implements Visitor<Operator<?>> {
// catch this for the recursive translation of step functions
WorksetNode p = new WorksetNode(holder, containingIterationNode);
- p.setDegreeOfParallelism(containingIterationNode.getParallelism());
+ p.setParallelism(containingIterationNode.getParallelism());
n = p;
}
else if (c instanceof DeltaIterationBase.SolutionSetPlaceHolder) {
@@ -224,7 +224,7 @@ public class GraphCreatingVisitor implements Visitor<Operator<?>> {
// catch this for the recursive translation of step functions
SolutionSetNode p = new SolutionSetNode(holder, containingIterationNode);
- p.setDegreeOfParallelism(containingIterationNode.getParallelism());
+ p.setParallelism(containingIterationNode.getParallelism());
n = p;
}
else {
@@ -233,13 +233,13 @@ public class GraphCreatingVisitor implements Visitor<Operator<?>> {
this.con2node.put(c, n);
- // set the parallelism only if it has not been set before. some nodes have a fixed DOP, such as the
+ // set the parallelism only if it has not been set before. some nodes have a fixed parallelism, such as the
// key-less reducer (all-reduce)
if (n.getParallelism() < 1) {
- // set the degree of parallelism
- int par = c.getDegreeOfParallelism();
+ // set the parallelism
+ int par = c.getParallelism();
if (par > 0) {
- if (this.forceDOP && par != this.defaultParallelism) {
+ if (this.forceParallelism && par != this.defaultParallelism) {
par = this.defaultParallelism;
Optimizer.LOG.warn("The parallelism of nested dataflows (such as step functions in iterations) is " +
"currently fixed to the parallelism of the surrounding operator (the iteration).");
@@ -247,7 +247,7 @@ public class GraphCreatingVisitor implements Visitor<Operator<?>> {
} else {
par = this.defaultParallelism;
}
- n.setDegreeOfParallelism(par);
+ n.setParallelism(par);
}
return true;