You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2015/03/23 09:09:26 UTC

[6/9] flink git commit: [FLINK-1679] use a consistent name for parallelism

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java
index 47446dd..55cbb0f 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java
@@ -57,13 +57,13 @@ public class FlatMapOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, Fl
 		FlatMapOperatorBase<IN, OUT, FlatMapFunction<IN, OUT>> po = new FlatMapOperatorBase<IN, OUT, FlatMapFunction<IN, OUT>>(function, new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()), name);
 		// set input
 		po.setInput(input);
-		// set dop
+		// set parallelism
 		if(this.getParallelism() > 0) {
-			// use specified dop
-			po.setDegreeOfParallelism(this.getParallelism());
+			// use specified parallelism
+			po.setParallelism(this.getParallelism());
 		} else {
-			// if no dop has been specified, use dop of input operator to enable chaining
-			po.setDegreeOfParallelism(input.getDegreeOfParallelism());
+			// if no parallelism has been specified, use parallelism of input operator to enable chaining
+			po.setParallelism(input.getParallelism());
 		}
 		
 		return po;

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
index 617162b..3c1d47c 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
@@ -103,8 +103,8 @@ public class GroupCombineOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU
 					new GroupCombineOperatorBase<IN, OUT, FlatCombineFunction<IN, OUT>>(function, operatorInfo, new int[0], name);
 
 			po.setInput(input);
-			// the degree of parallelism for a non grouped reduce can only be 1
-			po.setDegreeOfParallelism(1);
+			// the parallelism for a non grouped reduce can only be 1
+			po.setParallelism(1);
 			return po;
 		}
 
@@ -130,13 +130,13 @@ public class GroupCombineOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU
 				}
 				po.setGroupOrder(o);
 
-				po.setDegreeOfParallelism(this.getParallelism());
+				po.setParallelism(this.getParallelism());
 				return po;
 			} else {
 				PlanUnwrappingGroupCombineOperator<IN, OUT, ?> po = translateSelectorFunctionReducer(
 						selectorKeys, function, getInputType(), getResultType(), name, input);
 
-				po.setDegreeOfParallelism(this.getParallelism());
+				po.setParallelism(this.getParallelism());
 				return po;
 			}
 		}
@@ -148,7 +148,7 @@ public class GroupCombineOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU
 					new GroupCombineOperatorBase<IN, OUT, FlatCombineFunction<IN, OUT>>(function, operatorInfo, logicalKeyPositions, name);
 
 			po.setInput(input);
-			po.setDegreeOfParallelism(getParallelism());
+			po.setParallelism(getParallelism());
 
 			// set group order
 			if (grouper instanceof SortedGrouping) {
@@ -193,7 +193,7 @@ public class GroupCombineOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU
 		mapper.setInput(input);
 
 		// set the mapper's parallelism to the input parallelism to make sure it is chained
-		mapper.setDegreeOfParallelism(input.getDegreeOfParallelism());
+		mapper.setParallelism(input.getParallelism());
 
 		return reducer;
 	}
@@ -220,7 +220,7 @@ public class GroupCombineOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU
 		mapper.setInput(input);
 
 		// set the mapper's parallelism to the input parallelism to make sure it is chained
-		mapper.setDegreeOfParallelism(input.getDegreeOfParallelism());
+		mapper.setParallelism(input.getParallelism());
 
 		return reducer;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
index c542192..e809623 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
@@ -138,8 +138,8 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
 			
 			po.setCombinable(combinable);
 			po.setInput(input);
-			// the degree of parallelism for a non grouped reduce can only be 1
-			po.setDegreeOfParallelism(1);
+			// the parallelism for a non grouped reduce can only be 1
+			po.setParallelism(1);
 			return po;
 		}
 	
@@ -165,14 +165,14 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
 				}
 				po.setGroupOrder(o);
 
-				po.setDegreeOfParallelism(this.getParallelism());
+				po.setParallelism(this.getParallelism());
 				po.setCustomPartitioner(grouper.getCustomPartitioner());
 				return po;
 			} else {
 				PlanUnwrappingReduceGroupOperator<IN, OUT, ?> po = translateSelectorFunctionReducer(
 							selectorKeys, function, getInputType(), getResultType(), name, input, isCombinable());
 
-				po.setDegreeOfParallelism(this.getParallelism());
+				po.setParallelism(this.getParallelism());
 				po.setCustomPartitioner(grouper.getCustomPartitioner());
 				return po;
 			}
@@ -186,7 +186,7 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
 
 			po.setCombinable(combinable);
 			po.setInput(input);
-			po.setDegreeOfParallelism(getParallelism());
+			po.setParallelism(getParallelism());
 			po.setCustomPartitioner(grouper.getCustomPartitioner());
 			
 			// set group order
@@ -233,7 +233,7 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
 		mapper.setInput(input);
 		
 		// set the mapper's parallelism to the input parallelism to make sure it is chained
-		mapper.setDegreeOfParallelism(input.getDegreeOfParallelism());
+		mapper.setParallelism(input.getParallelism());
 		
 		return reducer;
 	}
@@ -261,7 +261,7 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
 		mapper.setInput(input);
 
 		// set the mapper's parallelism to the input parallelism to make sure it is chained
-		mapper.setDegreeOfParallelism(input.getDegreeOfParallelism());
+		mapper.setParallelism(input.getParallelism());
 
 		return reducer;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
index 8b61779..e450ae1 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
@@ -273,8 +273,8 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 						translateSelectorFunctionJoin(selectorKeys1, selectorKeys2, function, 
 						getInput1Type(), getInput2Type(), getResultType(), name, input1, input2);
 				
-				// set dop
-				po.setDegreeOfParallelism(this.getParallelism());
+				// set parallelism
+				po.setParallelism(this.getParallelism());
 				
 				translated = po;
 			}
@@ -292,8 +292,8 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 								function, getInput1Type(), getInput2Type(), getResultType(), name,
 								input1, input2);
 
-				// set dop
-				po.setDegreeOfParallelism(this.getParallelism());
+				// set parallelism
+				po.setParallelism(this.getParallelism());
 
 				translated = po;
 			}
@@ -311,8 +311,8 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 						translateSelectorFunctionJoinLeft(selectorKeys1, logicalKeyPositions2, function,
 								getInput1Type(), getInput2Type(), getResultType(), name, input1, input2);
 
-				// set dop
-				po.setDegreeOfParallelism(this.getParallelism());
+				// set parallelism
+				po.setParallelism(this.getParallelism());
 
 				translated = po;
 			}
@@ -332,8 +332,8 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 				// set inputs
 				po.setFirstInput(input1);
 				po.setSecondInput(input2);
-				// set dop
-				po.setDegreeOfParallelism(this.getParallelism());
+				// set parallelism
+				po.setParallelism(this.getParallelism());
 				
 				translated = po;
 			}
@@ -375,9 +375,9 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 			
 			keyMapper1.setInput(input1);
 			keyMapper2.setInput(input2);
-			// set dop
-			keyMapper1.setDegreeOfParallelism(input1.getDegreeOfParallelism());
-			keyMapper2.setDegreeOfParallelism(input2.getDegreeOfParallelism());
+			// set parallelism
+			keyMapper1.setParallelism(input1.getParallelism());
+			keyMapper2.setParallelism(input2.getParallelism());
 			
 			return join;
 		}
@@ -427,8 +427,8 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 			join.setSecondInput(keyMapper2);
 			
 			keyMapper2.setInput(input2);
-			// set dop
-			keyMapper2.setDegreeOfParallelism(input2.getDegreeOfParallelism());
+			// set parallelism
+			keyMapper2.setParallelism(input2.getParallelism());
 			
 			return join;
 		}
@@ -477,8 +477,8 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 			join.setSecondInput(input2);
 			
 			keyMapper1.setInput(input1);
-			// set dop
-			keyMapper1.setDegreeOfParallelism(input1.getDegreeOfParallelism());
+			// set parallelism
+			keyMapper1.setParallelism(input1.getParallelism());
 
 			return join;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
index 7d2bbaa..2663a2a 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
@@ -61,13 +61,13 @@ public class MapOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, MapOpe
 				new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()), name);
 		// set input
 		po.setInput(input);
-		// set dop
+		// set parallelism
 		if(this.getParallelism() > 0) {
-			// use specified dop
-			po.setDegreeOfParallelism(this.getParallelism());
+			// use specified parallelism
+			po.setParallelism(this.getParallelism());
 		} else {
-			// if no dop has been specified, use dop of input operator to enable chaining
-			po.setDegreeOfParallelism(input.getDegreeOfParallelism());
+			// if no parallelism has been specified, use parallelism of input operator to enable chaining
+			po.setParallelism(input.getParallelism());
 		}
 		
 		return po;

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-java/src/main/java/org/apache/flink/api/java/operators/MapPartitionOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapPartitionOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapPartitionOperator.java
index a6c69c1..d8a1abd 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapPartitionOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapPartitionOperator.java
@@ -60,13 +60,13 @@ public class MapPartitionOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU
 		MapPartitionOperatorBase<IN, OUT, MapPartitionFunction<IN, OUT>> po = new MapPartitionOperatorBase<IN, OUT, MapPartitionFunction<IN, OUT>>(function, new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()), name);
 		// set input
 		po.setInput(input);
-		// set dop
+		// set parallelism
 		if(this.getParallelism() > 0) {
-			// use specified dop
-			po.setDegreeOfParallelism(this.getParallelism());
+			// use specified parallelism
+			po.setParallelism(this.getParallelism());
 		} else {
-			// if no dop has been specified, use dop of input operator to enable chaining
-			po.setDegreeOfParallelism(input.getDegreeOfParallelism());
+			// if no parallelism has been specified, use parallelism of input operator to enable chaining
+			po.setParallelism(input.getParallelism());
 		}
 		
 		return po;

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java
index 0f8a3eb..6d02749 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java
@@ -32,7 +32,7 @@ public abstract class Operator<OUT, O extends Operator<OUT, O>> extends DataSet<
 
 	protected String name;
 	
-	protected int dop = -1;
+	protected int parallelism = -1;
 
 	protected Operator(ExecutionEnvironment context, TypeInformation<OUT> resultType) {
 		super(context, resultType);
@@ -58,12 +58,12 @@ public abstract class Operator<OUT, O extends Operator<OUT, O>> extends DataSet<
 	}
 	
 	/**
-	 * Returns the degree of parallelism of this operator.
+	 * Returns the parallelism of this operator.
 	 * 
-	 * @return The degree of parallelism of this operator.
+	 * @return The parallelism of this operator.
 	 */
 	public int getParallelism() {
-		return this.dop;
+		return this.parallelism;
 	}
 
 	/**
@@ -82,17 +82,17 @@ public abstract class Operator<OUT, O extends Operator<OUT, O>> extends DataSet<
 	}
 	
 	/**
-	 * Sets the degree of parallelism for this operator.
+	 * Sets the parallelism for this operator.
 	 * The degree must be 1 or more.
 	 * 
-	 * @param dop The degree of parallelism for this operator.
-	 * @return The operator with set degree of parallelism.
+	 * @param parallelism The parallelism for this operator.
+	 * @return The operator with set parallelism.
 	 */
-	public O setParallelism(int dop) {
-		if(dop < 1) {
+	public O setParallelism(int parallelism) {
+		if(parallelism < 1) {
 			throw new IllegalArgumentException("The parallelism of an operator must be at least 1.");
 		}
-		this.dop = dop;
+		this.parallelism = parallelism;
 		
 		@SuppressWarnings("unchecked")
 		O returnType = (O) this;

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java
index 68a216b..28c1c29 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java
@@ -218,7 +218,7 @@ public class OperatorTranslation {
 		iterationOperator.setMaximumNumberOfIterations(iterationEnd.getMaxIterations());
 		
 		if (iterationHead.getParallelism() > 0) {
-			iterationOperator.setDegreeOfParallelism(iterationHead.getParallelism());
+			iterationOperator.setParallelism(iterationHead.getParallelism());
 		}
 
 		DeltaIteration.SolutionSetPlaceHolder<D> solutionSetPlaceHolder = iterationHead.getSolutionSet();

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
index edb5a68..bf9c8e8 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
@@ -118,7 +118,7 @@ public class PartitionOperator<T> extends SingleInputOperator<T, T, PartitionOpe
 			PartitionOperatorBase<T> noop = new PartitionOperatorBase<T>(operatorInfo, pMethod, name);
 			
 			noop.setInput(input);
-			noop.setDegreeOfParallelism(getParallelism());
+			noop.setParallelism(getParallelism());
 			
 			return noop;
 		} 
@@ -131,7 +131,7 @@ public class PartitionOperator<T> extends SingleInputOperator<T, T, PartitionOpe
 				PartitionOperatorBase<T> noop = new PartitionOperatorBase<T>(operatorInfo, pMethod, logicalKeyPositions, name);
 				
 				noop.setInput(input);
-				noop.setDegreeOfParallelism(getParallelism());
+				noop.setParallelism(getParallelism());
 				noop.setCustomPartitioner(customPartitioner);
 				
 				return noop;
@@ -177,10 +177,10 @@ public class PartitionOperator<T> extends SingleInputOperator<T, T, PartitionOpe
 		
 		noop.setCustomPartitioner(customPartitioner);
 		
-		// set dop
-		keyExtractingMap.setDegreeOfParallelism(input.getDegreeOfParallelism());
-		noop.setDegreeOfParallelism(partitionDop);
-		keyRemovingMap.setDegreeOfParallelism(partitionDop);
+		// set parallelism
+		keyExtractingMap.setParallelism(input.getParallelism());
+		noop.setParallelism(partitionDop);
+		keyRemovingMap.setParallelism(partitionDop);
 		
 		return keyRemovingMap;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
index 16d9ff3..9b7d567 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
@@ -71,8 +71,8 @@ public class ProjectOperator<IN, OUT extends Tuple>
 		PlanProjectOperator<IN, OUT> ppo = new PlanProjectOperator<IN, OUT>(fields, name, getInputType(), getResultType(), context.getConfig());
 		// set input
 		ppo.setInput(input);
-		// set dop
-		ppo.setDegreeOfParallelism(this.getParallelism());
+		// set parallelism
+		ppo.setParallelism(this.getParallelism());
 		ppo.setSemanticProperties(SemanticPropUtil.createProjectionPropertiesSingle(fields, (CompositeType<?>) getInputType()));
 
 		return ppo;

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
index d1ad4c3..5951df8 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
@@ -90,8 +90,8 @@ public class ReduceOperator<IN> extends SingleInputUdfOperator<IN, IN, ReduceOpe
 					new ReduceOperatorBase<IN, ReduceFunction<IN>>(function, operatorInfo, new int[0], name);
 			
 			po.setInput(input);
-			// the degree of parallelism for a non grouped reduce can only be 1
-			po.setDegreeOfParallelism(1);
+			// the parallelism for a non grouped reduce can only be 1
+			po.setParallelism(1);
 			
 			return po;
 		}
@@ -118,7 +118,7 @@ public class ReduceOperator<IN> extends SingleInputUdfOperator<IN, IN, ReduceOpe
 			po.setCustomPartitioner(grouper.getCustomPartitioner());
 			
 			po.setInput(input);
-			po.setDegreeOfParallelism(getParallelism());
+			po.setParallelism(getParallelism());
 			
 			return po;
 		}
@@ -130,7 +130,7 @@ public class ReduceOperator<IN> extends SingleInputUdfOperator<IN, IN, ReduceOpe
 	// --------------------------------------------------------------------------------------------
 	
 	private static <T, K> MapOperatorBase<Tuple2<K, T>, T, ?> translateSelectorFunctionReducer(Keys.SelectorFunctionKeys<T, ?> rawKeys,
-			ReduceFunction<T> function, TypeInformation<T> inputType, String name, Operator<T> input, int dop)
+			ReduceFunction<T> function, TypeInformation<T> inputType, String name, Operator<T> input, int parallelism)
 	{
 		@SuppressWarnings("unchecked")
 		final Keys.SelectorFunctionKeys<T, K> keys = (Keys.SelectorFunctionKeys<T, K>) rawKeys;
@@ -148,10 +148,10 @@ public class ReduceOperator<IN> extends SingleInputUdfOperator<IN, IN, ReduceOpe
 		reducer.setInput(keyExtractingMap);
 		keyRemovingMap.setInput(reducer);
 		
-		// set dop
-		keyExtractingMap.setDegreeOfParallelism(input.getDegreeOfParallelism());
-		reducer.setDegreeOfParallelism(dop);
-		keyRemovingMap.setDegreeOfParallelism(dop);
+		// set parallelism
+		keyExtractingMap.setParallelism(input.getParallelism());
+		reducer.setParallelism(parallelism);
+		keyRemovingMap.setParallelism(parallelism);
 		
 		return keyRemovingMap;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-java/src/test/java/org/apache/flink/api/java/io/TypeSerializerFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/TypeSerializerFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/TypeSerializerFormatTest.java
index f6e3c2a..35c564b 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/TypeSerializerFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/TypeSerializerFormatTest.java
@@ -47,8 +47,8 @@ public class TypeSerializerFormatTest extends SequentialFormatTestBase<Tuple2<In
 
 	private BlockInfo block;
 
-	public TypeSerializerFormatTest(int numberOfTuples, long blockSize, int degreeOfParallelism) {
-		super(numberOfTuples, blockSize, degreeOfParallelism);
+	public TypeSerializerFormatTest(int numberOfTuples, long blockSize, int parallelism) {
+		super(numberOfTuples, blockSize, parallelism);
 
         resultType = TypeExtractor.getForObject(getRecord(0));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/AggregateTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/AggregateTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/AggregateTranslationTest.java
index 08eefff..63b4052 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/AggregateTranslationTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/AggregateTranslationTest.java
@@ -39,8 +39,8 @@ public class AggregateTranslationTest {
 	@Test
 	public void translateAggregate() {
 		try {
-			final int DOP = 8;
-			ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(DOP);
+			final int parallelism = 8;
+			ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(parallelism);
 			
 			@SuppressWarnings("unchecked")
 			DataSet<Tuple3<Double, StringValue, Long>> initialData = 
@@ -58,7 +58,7 @@ public class AggregateTranslationTest {
 			assertEquals(1, reducer.getKeyColumns(0).length);
 			assertEquals(0, reducer.getKeyColumns(0)[0]);
 			
-			assertEquals(-1, reducer.getDegreeOfParallelism());
+			assertEquals(-1, reducer.getParallelism());
 			assertTrue(reducer.isCombinable());
 			
 			assertTrue(reducer.getInput() instanceof GenericDataSourceBase<?, ?>);

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
index 55a2aff..ae89780 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
@@ -59,14 +59,14 @@ public class DeltaIterationTranslationTest implements java.io.Serializable {
 			final int[] ITERATION_KEYS = new int[] {2};
 			final int NUM_ITERATIONS = 13;
 			
-			final int DEFAULT_DOP= 133;
-			final int ITERATION_DOP = 77;
+			final int DEFAULT_parallelism= 133;
+			final int ITERATION_parallelism = 77;
 			
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 			
 			// ------------ construct the test program ------------------
 			{
-				env.setDegreeOfParallelism(DEFAULT_DOP);
+				env.setParallelism(DEFAULT_parallelism);
 				
 				@SuppressWarnings("unchecked")
 				DataSet<Tuple3<Double, Long, String>> initialSolutionSet = env.fromElements(new Tuple3<Double, Long, String>(3.44, 5L, "abc"));
@@ -75,7 +75,7 @@ public class DeltaIterationTranslationTest implements java.io.Serializable {
 				DataSet<Tuple2<Double, String>> initialWorkSet = env.fromElements(new Tuple2<Double, String>(1.23, "abc"));
 				
 				DeltaIteration<Tuple3<Double, Long, String>, Tuple2<Double, String>> iteration = initialSolutionSet.iterateDelta(initialWorkSet, NUM_ITERATIONS, ITERATION_KEYS);
-				iteration.name(ITERATION_NAME).parallelism(ITERATION_DOP);
+				iteration.name(ITERATION_NAME).parallelism(ITERATION_parallelism);
 				
 				iteration.registerAggregator(AGGREGATOR_NAME, new LongSumAggregator());
 				
@@ -100,7 +100,7 @@ public class DeltaIterationTranslationTest implements java.io.Serializable {
 			
 			// ------------- validate the plan ----------------
 			assertEquals(JOB_NAME, p.getJobName());
-			assertEquals(DEFAULT_DOP, p.getDefaultParallelism());
+			assertEquals(DEFAULT_parallelism, p.getDefaultParallelism());
 			
 			// validate the iteration
 			GenericDataSinkBase<?> sink1, sink2;
@@ -118,7 +118,7 @@ public class DeltaIterationTranslationTest implements java.io.Serializable {
 			// check the basic iteration properties
 			assertEquals(NUM_ITERATIONS, iteration.getMaximumNumberOfIterations());
 			assertArrayEquals(ITERATION_KEYS, iteration.getSolutionSetKeyFields());
-			assertEquals(ITERATION_DOP, iteration.getDegreeOfParallelism());
+			assertEquals(ITERATION_parallelism, iteration.getParallelism());
 			assertEquals(ITERATION_NAME, iteration.getName());
 			
 			MapOperatorBase<?, ?, ?> nextWorksetMapper = (MapOperatorBase<?, ?, ?>) iteration.getNextWorkset();

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java
index 0cf2ee2..b7fbb78 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java
@@ -76,8 +76,8 @@ public class DistinctTranslationTest {
 	@Test
 	public void translateDistinctPlain() {
 		try {
-			final int DOP = 8;
-			ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(DOP);
+			final int parallelism = 8;
+			ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(parallelism);
 
 			DataSet<Tuple3<Double, StringValue, LongValue>> initialData = getSourceDataSet(env);
 
@@ -97,8 +97,8 @@ public class DistinctTranslationTest {
 			// check keys
 			assertArrayEquals(new int[] {0, 1, 2}, reducer.getKeyColumns(0));
 
-			// DOP was not configured on the operator
-			assertTrue(reducer.getDegreeOfParallelism() == 1 || reducer.getDegreeOfParallelism() == -1);
+			// parallelism was not configured on the operator
+			assertTrue(reducer.getParallelism() == 1 || reducer.getParallelism() == -1);
 
 			assertTrue(reducer.getInput() instanceof GenericDataSourceBase<?, ?>);
 		}
@@ -112,8 +112,8 @@ public class DistinctTranslationTest {
 	@Test
 	public void translateDistinctPlain2() {
 		try {
-			final int DOP = 8;
-			ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(DOP);
+			final int parallelism = 8;
+			ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(parallelism);
 
 			DataSet<CustomType> initialData = getSourcePojoDataSet(env);
 
@@ -133,8 +133,8 @@ public class DistinctTranslationTest {
 			// check keys
 			assertArrayEquals(new int[] {0}, reducer.getKeyColumns(0));
 
-			// DOP was not configured on the operator
-			assertTrue(reducer.getDegreeOfParallelism() == 1 || reducer.getDegreeOfParallelism() == -1);
+			// parallelism was not configured on the operator
+			assertTrue(reducer.getParallelism() == 1 || reducer.getParallelism() == -1);
 
 			assertTrue(reducer.getInput() instanceof GenericDataSourceBase<?, ?>);
 		}
@@ -148,8 +148,8 @@ public class DistinctTranslationTest {
 	@Test
 	public void translateDistinctPosition() {
 		try {
-			final int DOP = 8;
-			ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(DOP);
+			final int parallelism = 8;
+			ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(parallelism);
 
 			DataSet<Tuple3<Double, StringValue, LongValue>> initialData = getSourceDataSet(env);
 
@@ -169,8 +169,8 @@ public class DistinctTranslationTest {
 			// check keys
 			assertArrayEquals(new int[] {1, 2}, reducer.getKeyColumns(0));
 
-			// DOP was not configured on the operator
-			assertTrue(reducer.getDegreeOfParallelism() == 1 || reducer.getDegreeOfParallelism() == -1);
+			// parallelism was not configured on the operator
+			assertTrue(reducer.getParallelism() == 1 || reducer.getParallelism() == -1);
 
 			assertTrue(reducer.getInput() instanceof GenericDataSourceBase<?, ?>);
 		}
@@ -184,8 +184,8 @@ public class DistinctTranslationTest {
 	@Test
 	public void translateDistinctKeySelector() {
 		try {
-			final int DOP = 8;
-			ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(DOP);
+			final int parallelism = 8;
+			ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(parallelism);
 
 			DataSet<Tuple3<Double, StringValue, LongValue>> initialData = getSourceDataSet(env);
 
@@ -202,9 +202,9 @@ public class DistinctTranslationTest {
 			PlanUnwrappingReduceGroupOperator<?, ?, ?> reducer = (PlanUnwrappingReduceGroupOperator<?, ?, ?>) sink.getInput();
 			MapOperatorBase<?, ?, ?> keyExtractor = (MapOperatorBase<?, ?, ?>) reducer.getInput();
 
-			// check the DOPs
-			assertEquals(1, keyExtractor.getDegreeOfParallelism());
-			assertEquals(4, reducer.getDegreeOfParallelism());
+			// check the parallelisms
+			assertEquals(1, keyExtractor.getParallelism());
+			assertEquals(4, reducer.getParallelism());
 
 			// check types
 			TypeInformation<?> keyValueInfo = new TupleTypeInfo<Tuple2<StringValue, Tuple3<Double,StringValue,LongValue>>>(
@@ -232,8 +232,8 @@ public class DistinctTranslationTest {
 	@Test
 	public void translateDistinctExpressionKey() {
 		try {
-			final int DOP = 8;
-			ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(DOP);
+			final int parallelism = 8;
+			ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(parallelism);
 
 			DataSet<CustomType> initialData = getSourcePojoDataSet(env);
 
@@ -253,8 +253,8 @@ public class DistinctTranslationTest {
 			// check keys
 			assertArrayEquals(new int[] {0}, reducer.getKeyColumns(0));
 
-			// DOP was not configured on the operator
-			assertTrue(reducer.getDegreeOfParallelism() == 1 || reducer.getDegreeOfParallelism() == -1);
+			// parallelism was not configured on the operator
+			assertTrue(reducer.getParallelism() == 1 || reducer.getParallelism() == -1);
 
 			assertTrue(reducer.getInput() instanceof GenericDataSourceBase<?, ?>);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java
index f9253f8..b578eb7 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java
@@ -45,8 +45,8 @@ public class ReduceTranslationTests implements java.io.Serializable {
 	@Test
 	public void translateNonGroupedReduce() {
 		try {
-			final int DOP = 8;
-			ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(DOP);
+			final int parallelism = 8;
+			ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(parallelism);
 			
 			DataSet<Tuple3<Double, StringValue, LongValue>> initialData = getSourceDataSet(env);
 			
@@ -69,8 +69,8 @@ public class ReduceTranslationTests implements java.io.Serializable {
 			// check keys
 			assertTrue(reducer.getKeyColumns(0) == null || reducer.getKeyColumns(0).length == 0);
 			
-			// DOP was not configured on the operator
-			assertTrue(reducer.getDegreeOfParallelism() == 1 || reducer.getDegreeOfParallelism() == -1);
+			// parallelism was not configured on the operator
+			assertTrue(reducer.getParallelism() == 1 || reducer.getParallelism() == -1);
 			
 			assertTrue(reducer.getInput() instanceof GenericDataSourceBase<?, ?>);
 		}
@@ -84,8 +84,8 @@ public class ReduceTranslationTests implements java.io.Serializable {
 	@Test
 	public void translateGroupedReduceNoMapper() {
 		try {
-			final int DOP = 8;
-			ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(DOP);
+			final int parallelism = 8;
+			ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(parallelism);
 			
 			DataSet<Tuple3<Double, StringValue, LongValue>> initialData = getSourceDataSet(env);
 			
@@ -108,8 +108,8 @@ public class ReduceTranslationTests implements java.io.Serializable {
 			assertEquals(initialData.getType(), reducer.getOperatorInfo().getInputType());
 			assertEquals(initialData.getType(), reducer.getOperatorInfo().getOutputType());
 			
-			// DOP was not configured on the operator
-			assertTrue(reducer.getDegreeOfParallelism() == DOP || reducer.getDegreeOfParallelism() == -1);
+			// parallelism was not configured on the operator
+			assertTrue(reducer.getParallelism() == parallelism || reducer.getParallelism() == -1);
 			
 			// check keys
 			assertArrayEquals(new int[] {2}, reducer.getKeyColumns(0));
@@ -127,8 +127,8 @@ public class ReduceTranslationTests implements java.io.Serializable {
 	@Test
 	public void translateGroupedReduceWithkeyExtractor() {
 		try {
-			final int DOP = 8;
-			ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(DOP);
+			final int parallelism = 8;
+			ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(parallelism);
 			
 			DataSet<Tuple3<Double, StringValue, LongValue>> initialData = getSourceDataSet(env);
 			
@@ -154,10 +154,10 @@ public class ReduceTranslationTests implements java.io.Serializable {
 			PlanUnwrappingReduceOperator<?, ?> reducer = (PlanUnwrappingReduceOperator<?, ?>) keyProjector.getInput();
 			MapOperatorBase<?, ?, ?> keyExtractor = (MapOperatorBase<?, ?, ?>) reducer.getInput();
 			
-			// check the DOPs
-			assertEquals(1, keyExtractor.getDegreeOfParallelism());
-			assertEquals(4, reducer.getDegreeOfParallelism());
-			assertEquals(4, keyProjector.getDegreeOfParallelism());
+			// check the parallelisms
+			assertEquals(1, keyExtractor.getParallelism());
+			assertEquals(4, reducer.getParallelism());
+			assertEquals(4, keyProjector.getParallelism());
 			
 			// check types
 			TypeInformation<?> keyValueInfo = new TupleTypeInfo<Tuple2<StringValue, Tuple3<Double,StringValue,LongValue>>>(

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/main/java/org/apache/flink/optimizer/Optimizer.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/Optimizer.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/Optimizer.java
index 2101428..90421b7 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/Optimizer.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/Optimizer.java
@@ -288,9 +288,9 @@ public class Optimizer {
 	private final CostEstimator costEstimator;
 
 	/**
-	 * The default degree of parallelism for jobs compiled by this compiler.
+	 * The default parallelism for jobs compiled by this compiler.
 	 */
-	private int defaultDegreeOfParallelism;
+	private int defaultParallelism;
 
 
 	// ------------------------------------------------------------------------
@@ -348,14 +348,14 @@ public class Optimizer {
 		this.costEstimator = estimator;
 
 		// determine the default parallelism
-		this.defaultDegreeOfParallelism = GlobalConfiguration.getInteger(
-				ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE_KEY,
-				ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE);
+		this.defaultParallelism = GlobalConfiguration.getInteger(
+				ConfigConstants.DEFAULT_PARALLELISM_KEY,
+				ConfigConstants.DEFAULT_PARALLELISM);
 		
-		if (defaultDegreeOfParallelism < 1) {
-			LOG.warn("Config value " + defaultDegreeOfParallelism + " for option "
-					+ ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE + " is invalid. Ignoring and using a value of 1.");
-			this.defaultDegreeOfParallelism = 1;
+		if (defaultParallelism < 1) {
+			LOG.warn("Config value " + defaultParallelism + " for option "
+					+ ConfigConstants.DEFAULT_PARALLELISM + " is invalid. Ignoring and using a value of 1.");
+			this.defaultParallelism = 1;
 		}
 	}
 	
@@ -363,13 +363,13 @@ public class Optimizer {
 	//                             Getters / Setters
 	// ------------------------------------------------------------------------
 	
-	public int getDefaultDegreeOfParallelism() {
-		return defaultDegreeOfParallelism;
+	public int getDefaultParallelism() {
+		return defaultParallelism;
 	}
 	
-	public void setDefaultDegreeOfParallelism(int defaultDegreeOfParallelism) {
-		if (defaultDegreeOfParallelism > 0) {
-			this.defaultDegreeOfParallelism = defaultDegreeOfParallelism;
+	public void setDefaultParallelism(int defaultParallelism) {
+		if (defaultParallelism > 0) {
+			this.defaultParallelism = defaultParallelism;
 		} else {
 			throw new IllegalArgumentException("Default parallelism cannot be zero or negative.");
 		}
@@ -435,7 +435,7 @@ public class Optimizer {
 		final ExecutionMode defaultDataExchangeMode = program.getExecutionConfig().getExecutionMode();
 
 		final int defaultParallelism = program.getDefaultParallelism() > 0 ?
-			program.getDefaultParallelism() : this.defaultDegreeOfParallelism;
+			program.getDefaultParallelism() : this.defaultParallelism;
 
 		// log the default settings
 		LOG.debug("Using a default parallelism of {}",  defaultParallelism);

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java
index 068799e..1600a50 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java
@@ -122,12 +122,12 @@ public class BinaryUnionNode extends TwoInputNode {
 		final ExecutionMode input1Mode = this.input1.getDataExchangeMode();
 		final ExecutionMode input2Mode = this.input2.getDataExchangeMode();
 
-		final int dop = getParallelism();
-		final int inDop1 = getFirstPredecessorNode().getParallelism();
-		final int inDop2 = getSecondPredecessorNode().getParallelism();
+		final int parallelism = getParallelism();
+		final int inParallelism1 = getFirstPredecessorNode().getParallelism();
+		final int inParallelism2 = getSecondPredecessorNode().getParallelism();
 
-		final boolean dopChange1 = dop != inDop1;
-		final boolean dopChange2 = dop != inDop2;
+		final boolean dopChange1 = parallelism != inParallelism1;
+		final boolean dopChange2 = parallelism != inParallelism2;
 
 		final boolean input1breakPipeline = this.input1.isBreakingPipeline();
 		final boolean input2breakPipeline = this.input2.isBreakingPipeline();
@@ -152,8 +152,8 @@ public class BinaryUnionNode extends TwoInputNode {
 						// free to choose the ship strategy
 						igps.parameterizeChannel(c1, dopChange1, input1Mode, input1breakPipeline);
 						
-						// if the DOP changed, make sure that we cancel out properties, unless the
-						// ship strategy preserves/establishes them even under changing DOPs
+						// if the parallelism changed, make sure that we cancel out properties, unless the
+						// ship strategy preserves/establishes them even under changing parallelisms
 						if (dopChange1 && !c1.getShipStrategy().isNetworkStrategy()) {
 							c1.getGlobalProperties().reset();
 						}
@@ -179,8 +179,8 @@ public class BinaryUnionNode extends TwoInputNode {
 						// free to choose the ship strategy
 						igps.parameterizeChannel(c2, dopChange2, input2Mode, input2breakPipeline);
 						
-						// if the DOP changed, make sure that we cancel out properties, unless the
-						// ship strategy preserves/establishes them even under changing DOPs
+						// if the parallelism changed, make sure that we cancel out properties, unless the
+						// ship strategy preserves/establishes them even under changing parallelisms
 						if (dopChange2 && !c2.getShipStrategy().isNetworkStrategy()) {
 							c2.getGlobalProperties().reset();
 						}

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BulkIterationNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BulkIterationNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BulkIterationNode.java
index 55b8785..5dd868e 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BulkIterationNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BulkIterationNode.java
@@ -131,14 +131,14 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode
 	 */
 	public void setNextPartialSolution(OptimizerNode nextPartialSolution, OptimizerNode terminationCriterion) {
 		
-		// check if the root of the step function has the same DOP as the iteration
+		// check if the root of the step function has the same parallelism as the iteration
 		// or if the step function has any operator at all
 		if (nextPartialSolution.getParallelism() != getParallelism() ||
 			nextPartialSolution == partialSolution || nextPartialSolution instanceof BinaryUnionNode)
 		{
 			// add a no-op to the root to express the re-partitioning
 			NoOpNode noop = new NoOpNode();
-			noop.setDegreeOfParallelism(getParallelism());
+			noop.setParallelism(getParallelism());
 
 			DagConnection noOpConn = new DagConnection(nextPartialSolution, noop, ExecutionMode.PIPELINED);
 			noop.setIncomingConnection(noOpConn);
@@ -323,7 +323,7 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode
 					locPropsReq.parameterizeChannel(toNoOp);
 					
 					UnaryOperatorNode rebuildPropertiesNode = new UnaryOperatorNode("Rebuild Partial Solution Properties", FieldList.EMPTY_LIST);
-					rebuildPropertiesNode.setDegreeOfParallelism(candidate.getParallelism());
+					rebuildPropertiesNode.setParallelism(candidate.getParallelism());
 					
 					SingleInputPlanNode rebuildPropertiesPlanNode = new SingleInputPlanNode(rebuildPropertiesNode, "Rebuild Partial Solution Properties", toNoOp, DriverStrategy.UNARY_NO_OP);
 					rebuildPropertiesPlanNode.initProperties(toNoOp.getGlobalProperties(), toNoOp.getLocalProperties());

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DataSinkNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DataSinkNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DataSinkNode.java
index dbe04f4..6ca1149 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DataSinkNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DataSinkNode.java
@@ -204,11 +204,11 @@ public class DataSinkNode extends OptimizerNode {
 		List<? extends PlanNode> subPlans = getPredecessorNode().getAlternativePlans(estimator);
 		List<PlanNode> outputPlans = new ArrayList<PlanNode>();
 		
-		final int dop = getParallelism();
+		final int parallelism = getParallelism();
 		final int inDop = getPredecessorNode().getParallelism();
 
 		final ExecutionMode executionMode = this.input.getDataExchangeMode();
-		final boolean dopChange = dop != inDop;
+		final boolean dopChange = parallelism != inDop;
 		final boolean breakPipeline = this.input.isBreakingPipeline();
 
 		InterestingProperties ips = this.input.getInterestingProperties();

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DataSourceNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DataSourceNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DataSourceNode.java
index e4b35b7..6010f6a 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DataSourceNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DataSourceNode.java
@@ -75,7 +75,7 @@ public class DataSourceNode extends OptimizerNode {
 		}
 		
 		if (NonParallelInput.class.isAssignableFrom(pactContract.getUserCodeWrapper().getUserCodeClass())) {
-			setDegreeOfParallelism(1);
+			setParallelism(1);
 			this.sequentialInput = true;
 		} else {
 			this.sequentialInput = false;
@@ -115,10 +115,10 @@ public class DataSourceNode extends OptimizerNode {
 	}
 
 	@Override
-	public void setDegreeOfParallelism(int degreeOfParallelism) {
+	public void setParallelism(int parallelism) {
 		// if unsplittable, parallelism remains at 1
 		if (!this.sequentialInput) {
-			super.setDegreeOfParallelism(degreeOfParallelism);
+			super.setParallelism(parallelism);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupCombineNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupCombineNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupCombineNode.java
index 564c0d3..d25fed9 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupCombineNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupCombineNode.java
@@ -45,7 +45,7 @@ public class GroupCombineNode extends SingleInputNode {
 
 		if (this.keys == null) {
 			// case of a key-less reducer. force a parallelism of 1
-			setDegreeOfParallelism(1);
+			setParallelism(1);
 		}
 
 		this.possibleProperties = initPossibleProperties();

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java
index 77acae5..227b75f 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java
@@ -53,7 +53,7 @@ public class GroupReduceNode extends SingleInputNode {
 		
 		if (this.keys == null) {
 			// case of a key-less reducer. force a parallelism of 1
-			setDegreeOfParallelism(1);
+			setParallelism(1);
 		}
 		
 		this.possibleProperties = initPossibleProperties(operator.getCustomPartitioner());

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OptimizerNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OptimizerNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OptimizerNode.java
index 0cad34e..6bf43ea 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OptimizerNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OptimizerNode.java
@@ -99,7 +99,7 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
 	// --------------------------------- General Parameters ---------------------------------------
 	
 	private int parallelism = -1; // the number of parallel instances of this node
-	
+
 	private long minimalMemoryPerSubTask = -1;
 
 	protected int id = -1; 				// the id for this node.
@@ -390,9 +390,9 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
 	 * @param parallelism The parallelism to set.
 	 * @throws IllegalArgumentException If the parallelism is smaller than one and not -1.
 	 */
-	public void setDegreeOfParallelism(int parallelism) {
+	public void setParallelism(int parallelism) {
 		if (parallelism < 1 && parallelism != -1) {
-			throw new IllegalArgumentException("Degree of parallelism of " + parallelism + " is invalid.");
+			throw new IllegalArgumentException("Parallelism of " + parallelism + " is invalid.");
 		}
 		this.parallelism = parallelism;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java
index 1477038..52bfb6a 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java
@@ -43,7 +43,7 @@ public class ReduceNode extends SingleInputNode {
 		
 		if (this.keys == null) {
 			// case of a key-less reducer. force a parallelism of 1
-			setDegreeOfParallelism(1);
+			setParallelism(1);
 		}
 		
 		OperatorDescriptorSingle props = this.keys == null ?

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SingleInputNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SingleInputNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SingleInputNode.java
index cc12bb8..e9b31f4 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SingleInputNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SingleInputNode.java
@@ -283,9 +283,10 @@ public abstract class SingleInputNode extends OptimizerNode {
 
 		final ExecutionMode executionMode = this.inConn.getDataExchangeMode();
 
-		final int dop = getParallelism();
-		final int inDop = getPredecessorNode().getParallelism();
-		final boolean dopChange = inDop != dop;
+		final int parallelism = getParallelism();
+		final int inParallelism = getPredecessorNode().getParallelism();
+
+		final boolean parallelismChange = inParallelism != parallelism;
 
 		final boolean breaksPipeline = this.inConn.isBreakingPipeline();
 
@@ -293,8 +294,8 @@ public abstract class SingleInputNode extends OptimizerNode {
 		for (PlanNode child : subPlans) {
 
 			if (child.getGlobalProperties().isFullyReplicated()) {
-				// fully replicated input is always locally forwarded if DOP is not changed
-				if (dopChange) {
+				// fully replicated input is always locally forwarded if the parallelism is not changed
+				if (parallelismChange) {
 					// can not continue with this child
 					childrenSkippedDueToReplicatedInput = true;
 					continue;
@@ -307,11 +308,11 @@ public abstract class SingleInputNode extends OptimizerNode {
 				// pick the strategy ourselves
 				for (RequestedGlobalProperties igps: intGlobal) {
 					final Channel c = new Channel(child, this.inConn.getMaterializationMode());
-					igps.parameterizeChannel(c, dopChange, executionMode, breaksPipeline);
+					igps.parameterizeChannel(c, parallelismChange, executionMode, breaksPipeline);
 					
-					// if the DOP changed, make sure that we cancel out properties, unless the
-					// ship strategy preserves/establishes them even under changing DOPs
-					if (dopChange && !c.getShipStrategy().isNetworkStrategy()) {
+					// if the parallelism changed, make sure that we cancel out properties, unless the
+					// ship strategy preserves/establishes them even under changing parallelisms
+					if (parallelismChange && !c.getShipStrategy().isNetworkStrategy()) {
 						c.getGlobalProperties().reset();
 					}
 					
@@ -339,7 +340,7 @@ public abstract class SingleInputNode extends OptimizerNode {
 					c.setShipStrategy(shipStrategy, exMode);
 				}
 				
-				if (dopChange) {
+				if (parallelismChange) {
 					c.adjustGlobalPropertiesForFullParallelismChange();
 				}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SinkJoiner.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SinkJoiner.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SinkJoiner.java
index 40725ba..06606f0 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SinkJoiner.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SinkJoiner.java
@@ -47,7 +47,7 @@ public class SinkJoiner extends TwoInputNode {
 		this.input1 = conn1;
 		this.input2 = conn2;
 		
-		setDegreeOfParallelism(1);
+		setParallelism(1);
 	}
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java
index 39da165..f3122ba 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java
@@ -352,12 +352,12 @@ public abstract class TwoInputNode extends OptimizerNode {
 		final ExecutionMode input1Mode = this.input1.getDataExchangeMode();
 		final ExecutionMode input2Mode = this.input2.getDataExchangeMode();
 
-		final int dop = getParallelism();
-		final int inDop1 = getFirstPredecessorNode().getParallelism();
-		final int inDop2 = getSecondPredecessorNode().getParallelism();
+		final int parallelism = getParallelism();
+		final int inParallelism1 = getFirstPredecessorNode().getParallelism();
+		final int inParallelism2 = getSecondPredecessorNode().getParallelism();
 
-		final boolean dopChange1 = dop != inDop1;
-		final boolean dopChange2 = dop != inDop2;
+		final boolean dopChange1 = parallelism != inParallelism1;
+		final boolean dopChange2 = parallelism != inParallelism2;
 
 		final boolean input1breaksPipeline = this.input1.isBreakingPipeline();
 		final boolean input2breaksPipeline = this.input2.isBreakingPipeline();
@@ -369,7 +369,7 @@ public abstract class TwoInputNode extends OptimizerNode {
 		for (PlanNode child1 : subPlans1) {
 
 			if (child1.getGlobalProperties().isFullyReplicated()) {
-				// fully replicated input is always locally forwarded if DOP is not changed
+				// fully replicated input is always locally forwarded if parallelism is not changed
 				if (dopChange1) {
 					// can not continue with this child
 					childrenSkippedDueToReplicatedInput = true;
@@ -382,7 +382,7 @@ public abstract class TwoInputNode extends OptimizerNode {
 			for (PlanNode child2 : subPlans2) {
 
 				if (child2.getGlobalProperties().isFullyReplicated()) {
-					// fully replicated input is always locally forwarded if DOP is not changed
+					// fully replicated input is always locally forwarded if parallelism is not changed
 					if (dopChange2) {
 						// can not continue with this child
 						childrenSkippedDueToReplicatedInput = true;
@@ -405,8 +405,8 @@ public abstract class TwoInputNode extends OptimizerNode {
 						// free to choose the ship strategy
 						igps1.parameterizeChannel(c1, dopChange1, input1Mode, input1breaksPipeline);
 						
-						// if the DOP changed, make sure that we cancel out properties, unless the
-						// ship strategy preserves/establishes them even under changing DOPs
+						// if the parallelism changed, make sure that we cancel out properties, unless the
+						// ship strategy preserves/establishes them even under changing parallelisms
 						if (dopChange1 && !c1.getShipStrategy().isNetworkStrategy()) {
 							c1.getGlobalProperties().reset();
 						}
@@ -434,8 +434,8 @@ public abstract class TwoInputNode extends OptimizerNode {
 							// free to choose the ship strategy
 							igps2.parameterizeChannel(c2, dopChange2, input2Mode, input2breaksPipeline);
 							
-							// if the DOP changed, make sure that we cancel out properties, unless the
-							// ship strategy preserves/establishes them even under changing DOPs
+							// if the parallelism changed, make sure that we cancel out properties, unless the
+							// ship strategy preserves/establishes them even under changing parallelisms
 							if (dopChange2 && !c2.getShipStrategy().isNetworkStrategy()) {
 								c2.getGlobalProperties().reset();
 							}

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java
index e85f289..99c868c 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java
@@ -167,7 +167,7 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode
 		// if the next workset is equal to the workset, we need to inject a no-op node
 		if (nextWorkset == worksetNode || nextWorkset instanceof BinaryUnionNode) {
 			NoOpNode noop = new NoOpNode();
-			noop.setDegreeOfParallelism(getParallelism());
+			noop.setParallelism(getParallelism());
 
 			DagConnection noOpConn = new DagConnection(nextWorkset, noop, executionMode);
 			noop.setIncomingConnection(noOpConn);
@@ -179,7 +179,7 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode
 		// attach an extra node to the solution set delta for the cases where we need to repartition
 		UnaryOperatorNode solutionSetDeltaUpdateAux = new UnaryOperatorNode("Solution-Set Delta", getSolutionSetKeyFields(),
 				new SolutionSetDeltaOperator(getSolutionSetKeyFields()));
-		solutionSetDeltaUpdateAux.setDegreeOfParallelism(getParallelism());
+		solutionSetDeltaUpdateAux.setParallelism(getParallelism());
 
 		DagConnection conn = new DagConnection(solutionSetDelta, solutionSetDeltaUpdateAux, executionMode);
 		solutionSetDeltaUpdateAux.setIncomingConnection(conn);
@@ -371,7 +371,7 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode
 					UnaryOperatorNode rebuildWorksetPropertiesNode = new UnaryOperatorNode("Rebuild Workset Properties",
 																							FieldList.EMPTY_LIST);
 					
-					rebuildWorksetPropertiesNode.setDegreeOfParallelism(candidate.getParallelism());
+					rebuildWorksetPropertiesNode.setParallelism(candidate.getParallelism());
 					
 					SingleInputPlanNode rebuildWorksetPropertiesPlanNode = new SingleInputPlanNode(
 												rebuildWorksetPropertiesNode, "Rebuild Workset Properties",
@@ -563,7 +563,7 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode
 		SingleRootJoiner() {
 			super(new NoOpBinaryUdfOp<Nothing>(new NothingTypeInfo()));
 			
-			setDegreeOfParallelism(1);
+			setParallelism(1);
 		}
 		
 		public void setInputs(DagConnection input1, DagConnection input2) {

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/RequestedGlobalProperties.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/RequestedGlobalProperties.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/RequestedGlobalProperties.java
index 8c3f6bd..3646d74 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/RequestedGlobalProperties.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/RequestedGlobalProperties.java
@@ -340,7 +340,7 @@ public final class RequestedGlobalProperties implements Cloneable {
 	 * the desired global properties.
 	 * 
 	 * @param channel The channel to parametrize.
-	 * @param globalDopChange Flag indicating whether the degree of parallelism changes
+	 * @param globalDopChange Flag indicating whether the parallelism changes
 	 *                        between sender and receiver.
 	 * @param exchangeMode The mode of data exchange (pipelined, always batch,
 	 *                     batch only on shuffle, ...)

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/AllGroupWithPartialPreGroupProperties.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/AllGroupWithPartialPreGroupProperties.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/AllGroupWithPartialPreGroupProperties.java
index b3c083a..4990a5d 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/AllGroupWithPartialPreGroupProperties.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/AllGroupWithPartialPreGroupProperties.java
@@ -53,9 +53,9 @@ public final class AllGroupWithPartialPreGroupProperties extends OperatorDescrip
 			Channel toCombiner = new Channel(in.getSource());
 			toCombiner.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
 			
-			// create an input node for combine with same DOP as input node
+			// create an input node for combine with same parallelism as input node
 			GroupReduceNode combinerNode = ((GroupReduceNode) node).getCombinerUtilityNode();
-			combinerNode.setDegreeOfParallelism(in.getSource().getParallelism());
+			combinerNode.setParallelism(in.getSource().getParallelism());
 
 			SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode,
 					"Combine ("+node.getOperator().getName()+")", toCombiner, DriverStrategy.ALL_GROUP_REDUCE_COMBINE);

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/AllReduceProperties.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/AllReduceProperties.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/AllReduceProperties.java
index a172a60..bd600e4 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/AllReduceProperties.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/AllReduceProperties.java
@@ -52,9 +52,9 @@ public final class AllReduceProperties extends OperatorDescriptorSingle {
 			Channel toCombiner = new Channel(in.getSource());
 			toCombiner.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
 			
-			// create an input node for combine with same DOP as input node
+			// create an input node for combine with same parallelism as input node
 			ReduceNode combinerNode = ((ReduceNode) node).getCombinerUtilityNode();
-			combinerNode.setDegreeOfParallelism(in.getSource().getParallelism());
+			combinerNode.setParallelism(in.getSource().getParallelism());
 
 			SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode,
 					"Combine ("+node.getOperator().getName()+")", toCombiner, DriverStrategy.ALL_REDUCE);

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupCombineProperties.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupCombineProperties.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupCombineProperties.java
index b648386..64054a2 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupCombineProperties.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupCombineProperties.java
@@ -71,7 +71,7 @@ public final class GroupCombineProperties extends OperatorDescriptorSingle {
 
 	@Override
 	public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
-		node.setDegreeOfParallelism(in.getSource().getParallelism());
+		node.setParallelism(in.getSource().getParallelism());
 		
 		// sorting key info
 		SingleInputPlanNode singleInputPlanNode = new SingleInputPlanNode(

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
index c4f47d3..86863d2 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
@@ -105,9 +105,9 @@ public final class GroupReduceWithCombineProperties extends OperatorDescriptorSi
 			Channel toCombiner = new Channel(in.getSource());
 			toCombiner.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
 
-			// create an input node for combine with same DOP as input node
+			// create an input node for combine with same parallelism as input node
 			GroupReduceNode combinerNode = ((GroupReduceNode) node).getCombinerUtilityNode();
-			combinerNode.setDegreeOfParallelism(in.getSource().getParallelism());
+			combinerNode.setParallelism(in.getSource().getParallelism());
 
 			SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine("+node.getOperator()
 					.getName()+")", toCombiner, DriverStrategy.SORTED_GROUP_COMBINE);

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/PartialGroupProperties.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/PartialGroupProperties.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/PartialGroupProperties.java
index 2bde29b..e4e6a7f 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/PartialGroupProperties.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/PartialGroupProperties.java
@@ -47,9 +47,9 @@ public final class PartialGroupProperties extends OperatorDescriptorSingle {
 
 	@Override
 	public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
-		// create in input node for combine with same DOP as input node
+		// create in input node for combine with the same parallelism as input node
 		GroupReduceNode combinerNode = new GroupReduceNode((GroupReduceOperatorBase<?, ?, ?>) node.getOperator());
-		combinerNode.setDegreeOfParallelism(in.getSource().getParallelism());
+		combinerNode.setParallelism(in.getSource().getParallelism());
 
 		SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine("+node.getOperator().getName()+")", in,
 				DriverStrategy.SORTED_GROUP_COMBINE);

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java
index 5bb51f3..81afe1e 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java
@@ -69,9 +69,9 @@ public final class ReduceProperties extends OperatorDescriptorSingle {
 			Channel toCombiner = new Channel(in.getSource());
 			toCombiner.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
 			
-			// create an input node for combine with same DOP as input node
+			// create an input node for combine with same parallelism as input node
 			ReduceNode combinerNode = ((ReduceNode) node).getCombinerUtilityNode();
-			combinerNode.setDegreeOfParallelism(in.getSource().getParallelism());
+			combinerNode.setParallelism(in.getSource().getParallelism());
 
 			SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode,
 								"Combine ("+node.getOperator().getName()+")", toCombiner,

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/Channel.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/Channel.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/Channel.java
index 875d1c3..4f8b1be 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/Channel.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/Channel.java
@@ -473,7 +473,7 @@ public class Channel implements EstimateProvider, Cloneable, DumpableConnection<
 	
 	public void adjustGlobalPropertiesForFullParallelismChange() {
 		if (this.shipStrategy == null || this.shipStrategy == ShipStrategyType.NONE) {
-			throw new IllegalStateException("Cannot adjust channel for degree of parallelism " +
+			throw new IllegalStateException("Cannot adjust channel for parallelism " +
 					"change before the ship strategy is set.");
 		}
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SinkJoinerPlanNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SinkJoinerPlanNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SinkJoinerPlanNode.java
index 451484d..c93d8c2 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SinkJoinerPlanNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SinkJoinerPlanNode.java
@@ -40,7 +40,7 @@ public class SinkJoinerPlanNode extends DualInputPlanNode {
 	public void setCosts(Costs nodeCosts) {
 		// the plan enumeration logic works as for regular two-input-operators, which is important
 		// because of the branch handling logic. it does pick redistributing network channels
-		// between the sink and the sink joiner, because sinks joiner has a different DOP than the sink.
+		// between the sink and the sink joiner, because sinks joiner has a different parallelism than the sink.
 		// we discard any cost and simply use the sum of the costs from the two children.
 		
 		Costs totalCosts = getInput1().getSource().getCumulativeCosts().clone();

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
index 6f918c0..b04cdd8 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
@@ -275,7 +275,7 @@ public class PlanJSONDumpGenerator {
 		// output node contents
 		writer.print(",\n\t\t\"contents\": \"" + contents + "\"");
 
-		// degree of parallelism
+		// parallelism
 		writer.print(",\n\t\t\"parallelism\": \""
 			+ (n.getParallelism() >= 1 ? n.getParallelism() : "default") + "\"");
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
index 04bc527..dc21c13 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
@@ -264,15 +264,15 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 				// for the bulk iteration, we skip creating anything for now. we create the graph
 				// for the step function in the post visit.
 				
-				// check that the root of the step function has the same DOP as the iteration.
-				// because the tail must have the same DOP as the head, we can only merge the last
-				// operator with the tail, if they have the same DOP. not merging is currently not
+				// check that the root of the step function has the same parallelism as the iteration.
+				// because the tail must have the same parallelism as the head, we can only merge the last
+				// operator with the tail, if they have the same parallelism. not merging is currently not
 				// implemented
 				PlanNode root = iterationNode.getRootOfStepFunction();
 				if (root.getParallelism() != node.getParallelism())
 				{
 					throw new CompilerException("Error: The final operator of the step " +
-							"function has a different degree of parallelism than the iteration operator itself.");
+							"function has a different parallelism than the iteration operator itself.");
 				}
 				
 				IterationDescriptor descr = new IterationDescriptor(iterationNode, this.iterationIdEnumerator++);
@@ -289,12 +289,12 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 				if (nextWorkSet.getParallelism() != node.getParallelism())
 				{
 					throw new CompilerException("It is currently not supported that the final operator of the step " +
-							"function has a different degree of parallelism than the iteration operator itself.");
+							"function has a different parallelism than the iteration operator itself.");
 				}
 				if (solutionSetDelta.getParallelism() != node.getParallelism())
 				{
 					throw new CompilerException("It is currently not supported that the final operator of the step " +
-							"function has a different degree of parallelism than the iteration operator itself.");
+							"function has a different parallelism than the iteration operator itself.");
 				}
 				
 				IterationDescriptor descr = new IterationDescriptor(iterationNode, this.iterationIdEnumerator++);
@@ -362,7 +362,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 		
 		// check if a vertex was created, or if it was chained or skipped
 		if (vertex != null) {
-			// set degree of parallelism
+			// set parallelism
 			int pd = node.getParallelism();
 			vertex.setParallelism(pd);
 			
@@ -370,10 +370,10 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 			
 			// check whether this vertex is part of an iteration step function
 			if (this.currentIteration != null) {
-				// check that the task has the same DOP as the iteration as such
+				// check that the task has the same parallelism as the iteration as such
 				PlanNode iterationNode = (PlanNode) this.currentIteration;
 				if (iterationNode.getParallelism() < pd) {
-					throw new CompilerException("Error: All functions that are part of an iteration must have the same, or a lower, degree-of-parallelism than the iteration operator.");
+					throw new CompilerException("Error: All functions that are part of an iteration must have the same, or a lower, parallelism than the iteration operator.");
 				}
 
 				// store the id of the iterations the step functions participate in
@@ -725,7 +725,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 					return 1;
 				}
 				else {
-					throw new CompilerException("Error: A changing degree of parallelism is currently " +
+					throw new CompilerException("Error: A changing parallelism is currently " +
 							"not supported between tasks within an iteration.");
 				}
 			} else {
@@ -880,7 +880,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 		// the step function, if
 		// 1) There is one parent that the partial solution connects to via a forward pattern and no
 		//    local strategy
-		// 2) DOP and the number of subtasks per instance does not change
+		// 2) parallelism and the number of subtasks per instance does not change
 		// 3) That successor is not a union
 		// 4) That successor is not itself the last node of the step function
 		// 5) There is no local strategy on the edge for the initial partial solution, as
@@ -948,7 +948,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 		// the step function, if
 		// 1) There is one parent that the partial solution connects to via a forward pattern and no
 		//    local strategy
-		// 2) DOP and the number of subtasks per instance does not change
+		// 2) parallelism and the number of subtasks per instance does not change
 		// 3) That successor is not a union
 		// 4) That successor is not itself the last node of the step function
 		// 5) There is no local strategy on the edge for the initial workset, as

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/GraphCreatingVisitor.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/GraphCreatingVisitor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/GraphCreatingVisitor.java
index 160ef95..37cffce 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/GraphCreatingVisitor.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/GraphCreatingVisitor.java
@@ -83,20 +83,20 @@ public class GraphCreatingVisitor implements Visitor<Operator<?>> {
 
 	private final List<DataSinkNode> sinks; // all data sink nodes in the optimizer plan
 
-	private final int defaultParallelism; // the default degree of parallelism
+	private final int defaultParallelism; // the default parallelism
 
 	private final GraphCreatingVisitor parent;	// reference to enclosing creator, in case of a recursive translation
 
 	private final ExecutionMode defaultDataExchangeMode;
 
-	private final boolean forceDOP;
+	private final boolean forceParallelism;
 
 
 	public GraphCreatingVisitor(int defaultParallelism, ExecutionMode defaultDataExchangeMode) {
 		this(null, false, defaultParallelism, defaultDataExchangeMode, null);
 	}
 
-	private GraphCreatingVisitor(GraphCreatingVisitor parent, boolean forceDOP, int defaultParallelism,
+	private GraphCreatingVisitor(GraphCreatingVisitor parent, boolean forceParallelism, int defaultParallelism,
 									ExecutionMode dataExchangeMode, HashMap<Operator<?>, OptimizerNode> closure) {
 		if (closure == null){
 			con2node = new HashMap<Operator<?>, OptimizerNode>();
@@ -108,7 +108,7 @@ public class GraphCreatingVisitor implements Visitor<Operator<?>> {
 		this.defaultParallelism = defaultParallelism;
 		this.parent = parent;
 		this.defaultDataExchangeMode = dataExchangeMode;
-		this.forceDOP = forceDOP;
+		this.forceParallelism = forceParallelism;
 	}
 
 	public List<DataSinkNode> getSinks() {
@@ -194,7 +194,7 @@ public class GraphCreatingVisitor implements Visitor<Operator<?>> {
 
 			// catch this for the recursive translation of step functions
 			BulkPartialSolutionNode p = new BulkPartialSolutionNode(holder, containingIterationNode);
-			p.setDegreeOfParallelism(containingIterationNode.getParallelism());
+			p.setParallelism(containingIterationNode.getParallelism());
 			n = p;
 		}
 		else if (c instanceof DeltaIterationBase.WorksetPlaceHolder) {
@@ -209,7 +209,7 @@ public class GraphCreatingVisitor implements Visitor<Operator<?>> {
 
 			// catch this for the recursive translation of step functions
 			WorksetNode p = new WorksetNode(holder, containingIterationNode);
-			p.setDegreeOfParallelism(containingIterationNode.getParallelism());
+			p.setParallelism(containingIterationNode.getParallelism());
 			n = p;
 		}
 		else if (c instanceof DeltaIterationBase.SolutionSetPlaceHolder) {
@@ -224,7 +224,7 @@ public class GraphCreatingVisitor implements Visitor<Operator<?>> {
 
 			// catch this for the recursive translation of step functions
 			SolutionSetNode p = new SolutionSetNode(holder, containingIterationNode);
-			p.setDegreeOfParallelism(containingIterationNode.getParallelism());
+			p.setParallelism(containingIterationNode.getParallelism());
 			n = p;
 		}
 		else {
@@ -233,13 +233,13 @@ public class GraphCreatingVisitor implements Visitor<Operator<?>> {
 
 		this.con2node.put(c, n);
 
-		// set the parallelism only if it has not been set before. some nodes have a fixed DOP, such as the
+		// set the parallelism only if it has not been set before. some nodes have a fixed parallelism, such as the
 		// key-less reducer (all-reduce)
 		if (n.getParallelism() < 1) {
-			// set the degree of parallelism
-			int par = c.getDegreeOfParallelism();
+			// set the parallelism
+			int par = c.getParallelism();
 			if (par > 0) {
-				if (this.forceDOP && par != this.defaultParallelism) {
+				if (this.forceParallelism && par != this.defaultParallelism) {
 					par = this.defaultParallelism;
 					Optimizer.LOG.warn("The parallelism of nested dataflows (such as step functions in iterations) is " +
 						"currently fixed to the parallelism of the surrounding operator (the iteration).");
@@ -247,7 +247,7 @@ public class GraphCreatingVisitor implements Visitor<Operator<?>> {
 			} else {
 				par = this.defaultParallelism;
 			}
-			n.setDegreeOfParallelism(par);
+			n.setParallelism(par);
 		}
 
 		return true;