You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2015/03/23 09:09:23 UTC

[3/9] flink git commit: [FLINK-1679] use a consistent name for parallelism

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/cancelling/MatchJoinCancelingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/MatchJoinCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/MatchJoinCancelingITCase.java
index 8a324a3..8197e27 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/MatchJoinCancelingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/MatchJoinCancelingITCase.java
@@ -37,10 +37,10 @@ import org.apache.flink.util.Collector;
 
 @SuppressWarnings("deprecation")
 public class MatchJoinCancelingITCase extends CancellingTestBase {
-	private static final int DOP = 4;
+	private static final int parallelism = 4;
 
 	public MatchJoinCancelingITCase(){
-		setTaskManagerNumSlots(DOP);
+		setTaskManagerNumSlots(parallelism);
 	}
 	
 	// --------------- Test Sort Matches that are canceled while still reading / sorting -----------------
@@ -60,7 +60,7 @@ public class MatchJoinCancelingITCase extends CancellingTestBase {
 		GenericDataSink sink = new GenericDataSink(new DiscardingOutputFormat(), matcher, "Sink");
 		
 		Plan p = new Plan(sink);
-		p.setDefaultParallelism(DOP);
+		p.setDefaultParallelism(parallelism);
 		
 		runAndCancelJob(p, 3000, 10*1000);
 	}
@@ -81,7 +81,7 @@ public class MatchJoinCancelingITCase extends CancellingTestBase {
 		GenericDataSink sink = new GenericDataSink(new DiscardingOutputFormat(), matcher, "Sink");
 		
 		Plan p = new Plan(sink);
-		p.setDefaultParallelism(DOP);
+		p.setDefaultParallelism(parallelism);
 		
 		runAndCancelJob(p, 5000, 10*1000);
 	}
@@ -102,7 +102,7 @@ public class MatchJoinCancelingITCase extends CancellingTestBase {
 		GenericDataSink sink = new GenericDataSink(new DiscardingOutputFormat(), matcher, "Sink");
 		
 		Plan p = new Plan(sink);
-		p.setDefaultParallelism(DOP);
+		p.setDefaultParallelism(parallelism);
 		
 		runAndCancelJob(p, 5000);
 		
@@ -129,7 +129,7 @@ public class MatchJoinCancelingITCase extends CancellingTestBase {
 		GenericDataSink sink = new GenericDataSink(new DiscardingOutputFormat(), matcher, "Sink");
 		
 		Plan p = new Plan(sink);
-		p.setDefaultParallelism(DOP);
+		p.setDefaultParallelism(parallelism);
 		
 		runAndCancelJob(p, 30 * 1000, 30 * 1000);
 	}
@@ -157,7 +157,7 @@ public class MatchJoinCancelingITCase extends CancellingTestBase {
 		GenericDataSink sink = new GenericDataSink(new DiscardingOutputFormat(), matcher, "Sink");
 		
 		Plan p = new Plan(sink);
-		p.setDefaultParallelism(DOP);
+		p.setDefaultParallelism(parallelism);
 		
 		runAndCancelJob(p, 10 * 1000, 20 * 1000);
 	}
@@ -183,7 +183,7 @@ public class MatchJoinCancelingITCase extends CancellingTestBase {
 		GenericDataSink sink = new GenericDataSink(new DiscardingOutputFormat(), matcher, "Sink");
 		
 		Plan p = new Plan(sink);
-		p.setDefaultParallelism(DOP);
+		p.setDefaultParallelism(parallelism);
 		
 		runAndCancelJob(p, 10 * 1000, 10 * 1000);
 	}
@@ -191,7 +191,7 @@ public class MatchJoinCancelingITCase extends CancellingTestBase {
 	// -------------------------------------- Test System corner cases ---------------------------------
 	
 //	@Test
-	public void testCancelSortMatchWithHighDOP() throws Exception {
+	public void testCancelSortMatchWithHighparallelism() throws Exception {
 		
 		GenericDataSource<InfiniteIntegerInputFormat> source1 =
 			new GenericDataSource<InfiniteIntegerInputFormat>(new InfiniteIntegerInputFormat(), "Source 1");

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/clients/examples/LocalExecutorITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/clients/examples/LocalExecutorITCase.java b/flink-tests/src/test/java/org/apache/flink/test/clients/examples/LocalExecutorITCase.java
index 53b70ad..62e2893 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/clients/examples/LocalExecutorITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/clients/examples/LocalExecutorITCase.java
@@ -30,7 +30,7 @@ import org.junit.Test;
 
 public class LocalExecutorITCase {
 
-	private static final int DOP = 4;
+	private static final int parallelism = 4;
 
 	@Test
 	public void testLocalExecutorWithWordCount() {
@@ -50,11 +50,11 @@ public class LocalExecutorITCase {
 
 			LocalExecutor executor = new LocalExecutor();
 			executor.setDefaultOverwriteFiles(true);
-			executor.setTaskManagerNumSlots(DOP);
+			executor.setTaskManagerNumSlots(parallelism);
 			executor.setPrintStatusDuringExecution(false);
 			executor.start();
 			
-			executor.executePlan(wc.getPlan(Integer.valueOf(DOP).toString(), inFile.toURI().toString(),
+			executor.executePlan(wc.getPlan(Integer.valueOf(parallelism).toString(), inFile.toURI().toString(),
 					outFile.toURI().toString()));
 			executor.stop();
 		} catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/WordCountITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/WordCountITCase.java b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/WordCountITCase.java
index a69b5c2..5c1c500 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/WordCountITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/WordCountITCase.java
@@ -28,7 +28,7 @@ public class WordCountITCase extends JavaProgramTestBase {
 	protected String resultPath;
 
 	public WordCountITCase(){
-		setDegreeOfParallelism(4);
+		setParallelism(4);
 		setNumTaskManagers(2);
 		setTaskManagerNumSlots(2);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/TaskFailureITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/TaskFailureITCase.java b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/TaskFailureITCase.java
index 9021c6a..0d2c469 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/TaskFailureITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/TaskFailureITCase.java
@@ -42,7 +42,7 @@ import org.apache.flink.util.Collector;
 @SuppressWarnings("deprecation")
 public class TaskFailureITCase extends FailingTestBase {
 
-	private static final int DOP = 4;
+	private static final int parallelism = 4;
 
 	// input for map tasks
 	private static final String MAP_IN = "1 1\n2 2\n2 8\n4 4\n4 4\n6 6\n7 7\n8 8\n" +
@@ -57,7 +57,7 @@ public class TaskFailureITCase extends FailingTestBase {
 	private String resultPath;
 
 	public TaskFailureITCase(){
-		setTaskManagerNumSlots(DOP);
+		setTaskManagerNumSlots(parallelism);
 	}
 	
 	@Override
@@ -85,7 +85,7 @@ public class TaskFailureITCase extends FailingTestBase {
 
 		// generate plan
 		Plan plan = new Plan(output);
-		plan.setDefaultParallelism(DOP);
+		plan.setDefaultParallelism(parallelism);
 
 		// optimize and compile plan 
 		Optimizer pc = new Optimizer(new DataStatistics());

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsITCase.java
index 0a4673a..9023a1f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsITCase.java
@@ -58,7 +58,7 @@ public class CoGroupConnectedComponentsITCase extends RecordAPITestBase {
 	protected String resultPath;
 
 	public CoGroupConnectedComponentsITCase(){
-		setTaskManagerNumSlots(DOP);
+		setTaskManagerNumSlots(parallelism);
 	}
 	
 	
@@ -71,7 +71,7 @@ public class CoGroupConnectedComponentsITCase extends RecordAPITestBase {
 	
 	@Override
 	protected Plan getTestJob() {
-		return getPlan(DOP, verticesPath, edgesPath, resultPath, 100);
+		return getPlan(parallelism, verticesPath, edgesPath, resultPath, 100);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsITCase.java
index f6ada63..df3c00d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsITCase.java
@@ -41,7 +41,7 @@ public class ConnectedComponentsITCase extends RecordAPITestBase {
 	protected String resultPath;
 
 	public ConnectedComponentsITCase(){
-		setTaskManagerNumSlots(DOP);
+		setTaskManagerNumSlots(parallelism);
 	}
 	
 	
@@ -55,7 +55,7 @@ public class ConnectedComponentsITCase extends RecordAPITestBase {
 	@Override
 	protected Plan getTestJob() {
 		WorksetConnectedComponents cc = new WorksetConnectedComponents();
-		return cc.getPlan(Integer.valueOf(DOP).toString(),  verticesPath, edgesPath, resultPath, "100");
+		return cc.getPlan(Integer.valueOf(parallelism).toString(),  verticesPath, edgesPath, resultPath, "100");
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithDeferredUpdateITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithDeferredUpdateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithDeferredUpdateITCase.java
index 0b1e372..d5d150d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithDeferredUpdateITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithDeferredUpdateITCase.java
@@ -65,7 +65,7 @@ public class ConnectedComponentsWithDeferredUpdateITCase extends RecordAPITestBa
 	
 	public ConnectedComponentsWithDeferredUpdateITCase(Configuration config) {
 		super(config);
-		setTaskManagerNumSlots(DOP);
+		setTaskManagerNumSlots(parallelism);
 	}
 	
 	@Override
@@ -78,7 +78,7 @@ public class ConnectedComponentsWithDeferredUpdateITCase extends RecordAPITestBa
 	@Override
 	protected Plan getTestJob() {
 		boolean extraMapper = config.getBoolean("ExtraMapper", false);
-		return getPlan(DOP, verticesPath, edgesPath, resultPath, 100, extraMapper);
+		return getPlan(parallelism, verticesPath, edgesPath, resultPath, 100, extraMapper);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithSolutionSetFirstITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithSolutionSetFirstITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithSolutionSetFirstITCase.java
index 33650c5..b97d0ad 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithSolutionSetFirstITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithSolutionSetFirstITCase.java
@@ -60,7 +60,7 @@ public class ConnectedComponentsWithSolutionSetFirstITCase extends RecordAPITest
 	protected String resultPath;
 
 	public ConnectedComponentsWithSolutionSetFirstITCase(){
-		setTaskManagerNumSlots(DOP);
+		setTaskManagerNumSlots(parallelism);
 	}
 	
 	
@@ -73,7 +73,7 @@ public class ConnectedComponentsWithSolutionSetFirstITCase extends RecordAPITest
 	
 	@Override
 	protected Plan getTestJob() {
-		return getPlanForWorksetConnectedComponentsWithSolutionSetAsFirstInput(DOP, verticesPath, edgesPath,
+		return getPlanForWorksetConnectedComponentsWithSolutionSetAsFirstInput(parallelism, verticesPath, edgesPath,
 				resultPath, 100);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/iterative/DanglingPageRankITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/DanglingPageRankITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/DanglingPageRankITCase.java
index 39617b4..e2d095d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/DanglingPageRankITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/DanglingPageRankITCase.java
@@ -40,7 +40,7 @@ public class DanglingPageRankITCase extends RecordAPITestBase {
 	protected Plan getTestJob() {
 		DanglingPageRank pr = new DanglingPageRank();
 		Plan plan = pr.getPlan(
-			String.valueOf(DOP),
+			String.valueOf(parallelism),
 			pagesPath,
 			edgesPath,
 			resultPath,

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/iterative/DeltaIterationNotDependingOnSolutionSetITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/DeltaIterationNotDependingOnSolutionSetITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/DeltaIterationNotDependingOnSolutionSetITCase.java
index 7f5015e..caa9d37 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/DeltaIterationNotDependingOnSolutionSetITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/DeltaIterationNotDependingOnSolutionSetITCase.java
@@ -40,7 +40,7 @@ public class DeltaIterationNotDependingOnSolutionSetITCase extends JavaProgramTe
 	protected void testProgram() throws Exception {
 		try {
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(1);
+			env.setParallelism(1);
 
 			DataSet<Tuple2<Long, Long>> input = env.generateSequence(0, 9).map(new Duplicator<Long>());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/iterative/DeltaPageRankITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/DeltaPageRankITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/DeltaPageRankITCase.java
index cf59a3f..bf459c6 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/DeltaPageRankITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/DeltaPageRankITCase.java
@@ -42,7 +42,7 @@ public class DeltaPageRankITCase extends RecordAPITestBase {
 	
 	@Override
 	protected Plan getTestJob() {
-		String[] params = { String.valueOf(DOP) , verticesPath, edgesPath, resultPath, "3" };
+		String[] params = { String.valueOf(parallelism) , verticesPath, edgesPath, resultPath, "3" };
 		
 		WorksetConnectedComponents cc = new WorksetConnectedComponents();
 		return cc.getPlan(params);

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/iterative/DependencyConnectedComponentsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/DependencyConnectedComponentsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/DependencyConnectedComponentsITCase.java
index 15079ec..0635fe5 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/DependencyConnectedComponentsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/DependencyConnectedComponentsITCase.java
@@ -43,7 +43,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 public class DependencyConnectedComponentsITCase extends JavaProgramTestBase {
 	
 	private static final int MAX_ITERATIONS = 20;
-	private static final int DOP = 1;
+	private static final int parallelism = 1;
 
 	protected static List<Tuple2<Long, Long>> verticesInput = new ArrayList<Tuple2<Long, Long>>();
 	protected static List<Tuple2<Long, Long>> edgesInput = new ArrayList<Tuple2<Long, Long>>();
@@ -51,7 +51,7 @@ public class DependencyConnectedComponentsITCase extends JavaProgramTestBase {
 	private String expectedResult;
 
 	public DependencyConnectedComponentsITCase(){
-		setTaskManagerNumSlots(DOP);
+		setTaskManagerNumSlots(parallelism);
 	}
 	
 	
@@ -113,7 +113,7 @@ public class DependencyConnectedComponentsITCase extends JavaProgramTestBase {
 		public static String runProgram(String resultPath) throws Exception {
 			
 			final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(DOP);
+			env.setParallelism(parallelism);
 			
 			DataSet<Tuple2<Long, Long>> initialSolutionSet = env.fromCollection(verticesInput);
 			DataSet<Tuple2<Long, Long>> edges = env.fromCollection(edgesInput);

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTerminationTail.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTerminationTail.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTerminationTail.java
index 7b6cf11..0915a42 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTerminationTail.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTerminationTail.java
@@ -47,7 +47,7 @@ public class IterationTerminationWithTerminationTail extends RecordAPITestBase {
 	protected String resultPath;
 
 	public IterationTerminationWithTerminationTail(){
-		setTaskManagerNumSlots(DOP);
+		setTaskManagerNumSlots(parallelism);
 	}
 	
 	@Override
@@ -63,7 +63,7 @@ public class IterationTerminationWithTerminationTail extends RecordAPITestBase {
 
 	@Override
 	protected Plan getTestJob() {
-		return getTestPlanPlan(DOP, dataPath, resultPath);
+		return getTestPlanPlan(parallelism, dataPath, resultPath);
 	}
 	
 	private static Plan getTestPlanPlan(int numSubTasks, String input, String output) {

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTwoTails.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTwoTails.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTwoTails.java
index f05b1c2..3ce021b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTwoTails.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTwoTails.java
@@ -47,7 +47,7 @@ public class IterationTerminationWithTwoTails extends RecordAPITestBase {
 	protected String resultPath;
 
 	public IterationTerminationWithTwoTails(){
-		setTaskManagerNumSlots(DOP);
+		setTaskManagerNumSlots(parallelism);
 	}
 
 	@Override
@@ -63,7 +63,7 @@ public class IterationTerminationWithTwoTails extends RecordAPITestBase {
 
 	@Override
 	protected Plan getTestJob() {
-		return getTestPlanPlan(DOP, dataPath, resultPath);
+		return getTestPlanPlan(parallelism, dataPath, resultPath);
 	}
 	
 	private static Plan getTestPlanPlan(int numSubTasks, String input, String output) {

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithAllReducerITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithAllReducerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithAllReducerITCase.java
index 3116a9d..cb16c15 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithAllReducerITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithAllReducerITCase.java
@@ -61,7 +61,7 @@ public class IterationWithAllReducerITCase extends RecordAPITestBase {
 
 	@Override
 	protected Plan getTestJob() {
-		Plan plan = getTestPlanPlan(DOP, dataPath, resultPath);
+		Plan plan = getTestPlanPlan(parallelism, dataPath, resultPath);
 		return plan;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithChainingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithChainingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithChainingITCase.java
index f626b75..c11c9ea 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithChainingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithChainingITCase.java
@@ -52,7 +52,7 @@ public class IterationWithChainingITCase extends RecordAPITestBase {
 
 	public IterationWithChainingITCase(Configuration config) {
 		super(config);
-		setTaskManagerNumSlots(DOP);
+		setTaskManagerNumSlots(parallelism);
 	}
 
 	@Override
@@ -74,7 +74,7 @@ public class IterationWithChainingITCase extends RecordAPITestBase {
 	@Parameters
 	public static Collection<Object[]> getConfigurations() {
 		Configuration config1 = new Configuration();
-		config1.setInteger("ChainedMapperITCase#NoSubtasks", DOP);
+		config1.setInteger("ChainedMapperITCase#NoSubtasks", parallelism);
 		return toParameterList(config1);
 	}
 
@@ -103,7 +103,7 @@ public class IterationWithChainingITCase extends RecordAPITestBase {
 	static Plan getTestPlan(int numSubTasks, String input, String output) {
 
 		FileDataSource initialInput = new FileDataSource(new PointInFormat(), input, "Input");
-		initialInput.setDegreeOfParallelism(1);
+		initialInput.setParallelism(1);
 
 		BulkIteration iteration = new BulkIteration("Loop");
 		iteration.setInput(initialInput);

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/iterative/IterativeKMeansITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterativeKMeansITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterativeKMeansITCase.java
index 4cd72fd..ac3659a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterativeKMeansITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterativeKMeansITCase.java
@@ -35,7 +35,7 @@ public class IterativeKMeansITCase extends RecordAPITestBase {
 	protected String resultPath;
 
 	public IterativeKMeansITCase(){
-		setTaskManagerNumSlots(DOP);
+		setTaskManagerNumSlots(parallelism);
 	}
 	
 	@Override
@@ -48,7 +48,7 @@ public class IterativeKMeansITCase extends RecordAPITestBase {
 	@Override
 	protected Plan getTestJob() {
 		KMeansBroadcast kmi = new KMeansBroadcast();
-		return kmi.getPlan(String.valueOf(DOP), dataPath, clusterPath, resultPath, "20");
+		return kmi.getPlan(String.valueOf(parallelism), dataPath, clusterPath, resultPath, "20");
 	}
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/iterative/KMeansITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/KMeansITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/KMeansITCase.java
index 2491028..fcf43df 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/KMeansITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/KMeansITCase.java
@@ -35,7 +35,7 @@ public class KMeansITCase extends RecordAPITestBase {
 	protected String resultPath;
 
 	public KMeansITCase(){
-		setTaskManagerNumSlots(DOP);
+		setTaskManagerNumSlots(parallelism);
 	}
 
 	@Override
@@ -48,7 +48,7 @@ public class KMeansITCase extends RecordAPITestBase {
 	@Override
 	protected Plan getTestJob() {
 		KMeansBroadcast kmi = new KMeansBroadcast();
-		return kmi.getPlan(String.valueOf(DOP), dataPath, clusterPath, resultPath, "20");
+		return kmi.getPlan(String.valueOf(parallelism), dataPath, clusterPath, resultPath, "20");
 	}
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/iterative/LineRankITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/LineRankITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/LineRankITCase.java
index 450b360..fa13656 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/LineRankITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/LineRankITCase.java
@@ -59,7 +59,7 @@
 //
 //	public LineRankITCase(Configuration config) {
 //		super(config);
-//		setTaskManagerNumSlots(DOP);
+//		setTaskManagerNumSlots(parallelism);
 //	}
 //
 //	@Override
@@ -85,7 +85,7 @@
 //	@Parameters
 //	public static Collection<Object[]> getConfigurations() {
 //		Configuration config1 = new Configuration();
-//		config1.setInteger("NumSubtasks", DOP);
+//		config1.setInteger("NumSubtasks", parallelism);
 //		config1.setInteger("NumIterations", 5);
 //		return toParameterList(config1);
 //	}

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/iterative/PageRankITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/PageRankITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/PageRankITCase.java
index 8e42dd7..946d89b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/PageRankITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/PageRankITCase.java
@@ -43,7 +43,7 @@ public class PageRankITCase extends RecordAPITestBase {
 	protected Plan getTestJob() {
 		SimplePageRank pr = new SimplePageRank();
 		Plan plan = pr.getPlan(
-			String.valueOf(DOP), 
+			String.valueOf(parallelism),
 			pagesPath,
 			edgesPath,
 			resultPath,

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
index 5aa6b42..44544d3 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
@@ -51,7 +51,7 @@ import org.junit.runners.Parameterized;
 public class AggregatorsITCase extends MultipleProgramsTestBase {
 
 	private static final int MAX_ITERATIONS = 20;
-	private static final int DOP = 2;
+	private static final int parallelism = 2;
 	private static final String NEGATIVE_ELEMENTS_AGGR = "count.negative.elements";
 
 	public AggregatorsITCase(TestExecutionMode mode){
@@ -81,7 +81,7 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
 		 */
 
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.setDegreeOfParallelism(DOP);
+		env.setParallelism(parallelism);
 
 		DataSet<Integer> initialSolutionSet = CollectionDataSets.getIntegerDataSet(env);
 		IterativeDataSet<Integer> iteration = initialSolutionSet.iterate(MAX_ITERATIONS);
@@ -110,7 +110,7 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
 		 */
 
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.setDegreeOfParallelism(DOP);
+		env.setParallelism(parallelism);
 
 		DataSet<Integer> initialSolutionSet = CollectionDataSets.getIntegerDataSet(env);
 		IterativeDataSet<Integer> iteration = initialSolutionSet.iterate(MAX_ITERATIONS);
@@ -139,7 +139,7 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
 		 */
 
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.setDegreeOfParallelism(DOP);
+		env.setParallelism(parallelism);
 
 		DataSet<Integer> initialSolutionSet = CollectionDataSets.getIntegerDataSet(env);
 		IterativeDataSet<Integer> iteration = initialSolutionSet.iterate(MAX_ITERATIONS);
@@ -168,7 +168,7 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
 		 */
 
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.setDegreeOfParallelism(DOP);
+		env.setParallelism(parallelism);
 
 		DataSet<Tuple2<Integer, Integer>> initialSolutionSet = CollectionDataSets.getIntegerDataSet(env).map(new TupleMakerMap());
 
@@ -202,7 +202,7 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
 		 */
 
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.setDegreeOfParallelism(DOP);
+		env.setParallelism(parallelism);
 
 		DataSet<Tuple2<Integer, Integer>> initialSolutionSet = CollectionDataSets.getIntegerDataSet(env).map(new TupleMakerMap());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableAggregatorITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableAggregatorITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableAggregatorITCase.java
index faaa541..8bf50de 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableAggregatorITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableAggregatorITCase.java
@@ -41,7 +41,7 @@ import org.apache.flink.api.java.operators.IterativeDataSet;
 public class ConnectedComponentsWithParametrizableAggregatorITCase extends JavaProgramTestBase {
 
 	private static final int MAX_ITERATIONS = 5;
-	private static final int DOP = 1;
+	private static final int parallelism = 1;
 
 	protected static List<Tuple2<Long, Long>> verticesInput = new ArrayList<Tuple2<Long, Long>>();
 	protected static List<Tuple2<Long, Long>> edgesInput = new ArrayList<Tuple2<Long, Long>>();
@@ -118,7 +118,7 @@ public class ConnectedComponentsWithParametrizableAggregatorITCase extends JavaP
 		public static String runProgram(String resultPath) throws Exception {
 
 			final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(DOP);
+			env.setParallelism(parallelism);
 
 			DataSet<Tuple2<Long, Long>> initialSolutionSet = env.fromCollection(verticesInput);
 			DataSet<Tuple2<Long, Long>> edges = env.fromCollection(edgesInput);

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableConvergenceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableConvergenceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableConvergenceITCase.java
index 4d890e9..e616a2b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableConvergenceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableConvergenceITCase.java
@@ -44,7 +44,7 @@ import org.apache.flink.api.java.operators.IterativeDataSet;
 public class ConnectedComponentsWithParametrizableConvergenceITCase extends JavaProgramTestBase {
 
 	private static final int MAX_ITERATIONS = 10;
-	private static final int DOP = 1;
+	private static final int parallelism = 1;
 
 	protected static List<Tuple2<Long, Long>> verticesInput = new ArrayList<Tuple2<Long, Long>>();
 	protected static List<Tuple2<Long, Long>> edgesInput = new ArrayList<Tuple2<Long, Long>>();
@@ -111,7 +111,7 @@ public class ConnectedComponentsWithParametrizableConvergenceITCase extends Java
 		public static String runProgram(String resultPath) throws Exception {
 
 			final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(DOP);
+			env.setParallelism(parallelism);
 
 			DataSet<Tuple2<Long, Long>> initialSolutionSet = env.fromCollection(verticesInput);
 			DataSet<Tuple2<Long, Long>> edges = env.fromCollection(edgesInput);

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
index 0e568b6..7cec122 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
@@ -89,9 +89,9 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
 
 	private static final long MEM_PER_CONSUMER = 3;
 
-	private static final int DOP = 4;
+	private static final int parallelism = 4;
 
-	private static final double MEM_FRAC_PER_CONSUMER = (double)MEM_PER_CONSUMER/TASK_MANAGER_MEMORY_SIZE*DOP;
+	private static final double MEM_FRAC_PER_CONSUMER = (double)MEM_PER_CONSUMER/TASK_MANAGER_MEMORY_SIZE*parallelism;
 
 	protected String verticesPath;
 
@@ -101,7 +101,7 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
 
 	public ConnectedComponentsNepheleITCase(Configuration config) {
 		super(config);
-		setTaskManagerNumSlots(DOP);
+		setTaskManagerNumSlots(parallelism);
 	}
 
 	@Parameters
@@ -135,14 +135,14 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
 		int type = config.getInteger("testcase", 0);
 		switch (type) {
 		case 1:
-			return createJobGraphUnifiedTails(verticesPath, edgesPath, resultPath, DOP, maxIterations);
+			return createJobGraphUnifiedTails(verticesPath, edgesPath, resultPath, parallelism, maxIterations);
 		case 2:
-			return createJobGraphSeparateTails(verticesPath, edgesPath, resultPath, DOP, maxIterations);
+			return createJobGraphSeparateTails(verticesPath, edgesPath, resultPath, parallelism, maxIterations);
 		case 3:
-			return createJobGraphIntermediateWorksetUpdateAndSolutionSetTail(verticesPath, edgesPath, resultPath, DOP,
+			return createJobGraphIntermediateWorksetUpdateAndSolutionSetTail(verticesPath, edgesPath, resultPath, parallelism,
 				maxIterations);
 		case 4:
-			return createJobGraphSolutionSetUpdateAndWorksetTail(verticesPath, edgesPath, resultPath, DOP,
+			return createJobGraphSolutionSetUpdateAndWorksetTail(verticesPath, edgesPath, resultPath, parallelism,
 				maxIterations);
 		default:
 			throw new RuntimeException("Broken test configuration");

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/DanglingPageRankNepheleITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/DanglingPageRankNepheleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/DanglingPageRankNepheleITCase.java
index f22bc84..516309c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/DanglingPageRankNepheleITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/DanglingPageRankNepheleITCase.java
@@ -40,7 +40,7 @@ public class DanglingPageRankNepheleITCase extends RecordAPITestBase {
 	protected String resultPath;
 
 	public DanglingPageRankNepheleITCase(){
-		setTaskManagerNumSlots(DOP);
+		setTaskManagerNumSlots(parallelism);
 	}
 
 	
@@ -54,7 +54,7 @@ public class DanglingPageRankNepheleITCase extends RecordAPITestBase {
 	@Override
 	protected JobGraph getJobGraph() throws Exception {
 		String[] parameters = new String[] {
-			Integer.valueOf(DOP).toString(),
+			Integer.valueOf(parallelism).toString(),
 			pagesWithRankPath,
 			edgesPath,
 			resultPath,

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/DanglingPageRankWithCombinerNepheleITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/DanglingPageRankWithCombinerNepheleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/DanglingPageRankWithCombinerNepheleITCase.java
index c4ffd02..ba22ce5 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/DanglingPageRankWithCombinerNepheleITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/DanglingPageRankWithCombinerNepheleITCase.java
@@ -29,7 +29,7 @@ public class DanglingPageRankWithCombinerNepheleITCase extends RecordAPITestBase
 	protected String resultPath;
 
 	public DanglingPageRankWithCombinerNepheleITCase(){
-		setTaskManagerNumSlots(DOP);
+		setTaskManagerNumSlots(parallelism);
 	}
 	
 	@Override
@@ -42,7 +42,7 @@ public class DanglingPageRankWithCombinerNepheleITCase extends RecordAPITestBase
 	@Override
 	protected JobGraph getJobGraph() throws Exception {
 		String[] parameters = new String[] {
-			Integer.valueOf(DOP).toString(),
+			Integer.valueOf(parallelism).toString(),
 			pagesWithRankPath,
 			edgesPath,
 			resultPath,

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/IterationWithChainingNepheleITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/IterationWithChainingNepheleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/IterationWithChainingNepheleITCase.java
index 2a8e84d..69ff083 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/IterationWithChainingNepheleITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/IterationWithChainingNepheleITCase.java
@@ -79,7 +79,7 @@ public class IterationWithChainingNepheleITCase extends RecordAPITestBase {
 
 	public IterationWithChainingNepheleITCase(Configuration config) {
 		super(config);
-		setTaskManagerNumSlots(DOP);
+		setTaskManagerNumSlots(parallelism);
 	}
 
 	@Override
@@ -99,7 +99,7 @@ public class IterationWithChainingNepheleITCase extends RecordAPITestBase {
 	@Parameterized.Parameters
 	public static Collection<Object[]> getConfigurations() {
 		Configuration config = new Configuration();
-		config.setInteger("ChainedMapperNepheleITCase#NoSubtasks", DOP);
+		config.setInteger("ChainedMapperNepheleITCase#NoSubtasks", parallelism);
 		config.setInteger("ChainedMapperNepheleITCase#MaxIterations", 2);
 		return toParameterList(config);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/JobGraphUtils.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/JobGraphUtils.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/JobGraphUtils.java
index 58969c9..153a85e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/JobGraphUtils.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/JobGraphUtils.java
@@ -40,20 +40,20 @@ public class JobGraphUtils {
 	private JobGraphUtils() {}
 	
 	public static <T extends FileInputFormat<?>> InputFormatVertex createInput(T stub, String path, String name, JobGraph graph,
-			int degreeOfParallelism)
+			int parallelism)
 	{
 		stub.setFilePath(path);
-		return createInput(new UserCodeObjectWrapper<T>(stub), name, graph, degreeOfParallelism);
+		return createInput(new UserCodeObjectWrapper<T>(stub), name, graph, parallelism);
 	}
 
 	private static <T extends InputFormat<?,?>> InputFormatVertex createInput(UserCodeWrapper<T> stub, String name, JobGraph graph,
-			int degreeOfParallelism)
+			int parallelism)
 	{
 		InputFormatVertex inputVertex = new InputFormatVertex(name);
 		graph.addVertex(inputVertex);
 		
 		inputVertex.setInvokableClass(DataSourceTask.class);
-		inputVertex.setParallelism(degreeOfParallelism);
+		inputVertex.setParallelism(parallelism);
 
 		TaskConfig inputConfig = new TaskConfig(inputVertex.getConfiguration());
 		inputConfig.setStubWrapper(stub);

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java
index 0705500..8801dd6 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java
@@ -96,7 +96,7 @@ public class CustomCompensatableDanglingPageRank {
 		
 	public static JobGraph getJobGraph(String[] args) throws Exception {
 
-		int degreeOfParallelism = 2;
+		int parallelism = 2;
 		String pageWithRankInputPath = ""; //"file://" + PlayConstants.PLAY_DIR + "test-inputs/danglingpagerank/pageWithRank";
 		String adjacencyListInputPath = ""; //"file://" + PlayConstants.PLAY_DIR +
 //			"test-inputs/danglingpagerank/adjacencylists";
@@ -114,7 +114,7 @@ public class CustomCompensatableDanglingPageRank {
 		double messageLoss = 0.75;
 
 		if (args.length >= 14) {
-			degreeOfParallelism = Integer.parseInt(args[0]);
+			parallelism = Integer.parseInt(args[0]);
 			pageWithRankInputPath = args[1];
 			adjacencyListInputPath = args[2];
 			outputPath = args[3];
@@ -138,7 +138,7 @@ public class CustomCompensatableDanglingPageRank {
 
 		// page rank input
 		InputFormatVertex pageWithRankInput = JobGraphUtils.createInput(new CustomImprovedDanglingPageRankInputFormat(),
-			pageWithRankInputPath, "DanglingPageWithRankInput", jobGraph, degreeOfParallelism);
+			pageWithRankInputPath, "DanglingPageWithRankInput", jobGraph, parallelism);
 		TaskConfig pageWithRankInputConfig = new TaskConfig(pageWithRankInput.getConfiguration());
 		pageWithRankInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
 		pageWithRankInputConfig.setOutputComparator(vertexWithRankAndDanglingComparator, 0);
@@ -147,7 +147,7 @@ public class CustomCompensatableDanglingPageRank {
 
 		// edges as adjacency list
 		InputFormatVertex adjacencyListInput = JobGraphUtils.createInput(new CustomImprovedAdjacencyListInputFormat(),
-			adjacencyListInputPath, "AdjancencyListInput", jobGraph, degreeOfParallelism);
+			adjacencyListInputPath, "AdjancencyListInput", jobGraph, parallelism);
 		TaskConfig adjacencyListInputConfig = new TaskConfig(adjacencyListInput.getConfiguration());
 		adjacencyListInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
 		adjacencyListInputConfig.setOutputSerializer(vertexWithAdjacencyListSerializer);
@@ -155,7 +155,7 @@ public class CustomCompensatableDanglingPageRank {
 
 		// --------------- the head ---------------------
 		AbstractJobVertex head = JobGraphUtils.createTask(IterationHeadPactTask.class, "IterationHead", jobGraph,
-			degreeOfParallelism);
+			parallelism);
 		TaskConfig headConfig = new TaskConfig(head.getConfiguration());
 		headConfig.setIterationId(ITERATION_ID);
 		
@@ -200,7 +200,7 @@ public class CustomCompensatableDanglingPageRank {
 		// --------------- the join ---------------------
 		
 		AbstractJobVertex intermediate = JobGraphUtils.createTask(IterationIntermediatePactTask.class,
-			"IterationIntermediate", jobGraph, degreeOfParallelism);
+			"IterationIntermediate", jobGraph, parallelism);
 		TaskConfig intermediateConfig = new TaskConfig(intermediate.getConfiguration());
 		intermediateConfig.setIterationId(ITERATION_ID);
 //		intermediateConfig.setDriver(RepeatableHashjoinMatchDriverWithCachedBuildside.class);
@@ -228,7 +228,7 @@ public class CustomCompensatableDanglingPageRank {
 		// ---------------- the tail (co group) --------------------
 		
 		AbstractJobVertex tail = JobGraphUtils.createTask(IterationTailPactTask.class, "IterationTail", jobGraph,
-			degreeOfParallelism);
+			parallelism);
 		TaskConfig tailConfig = new TaskConfig(tail.getConfiguration());
 		tailConfig.setIterationId(ITERATION_ID);
 		tailConfig.setIsWorksetUpdate();
@@ -264,7 +264,7 @@ public class CustomCompensatableDanglingPageRank {
 		
 		// --------------- the output ---------------------
 
-		OutputFormatVertex output = JobGraphUtils.createFileOutput(jobGraph, "FinalOutput", degreeOfParallelism);
+		OutputFormatVertex output = JobGraphUtils.createFileOutput(jobGraph, "FinalOutput", parallelism);
 		TaskConfig outputConfig = new TaskConfig(output.getConfiguration());
 		outputConfig.addInputToGroup(0);
 		outputConfig.setInputSerializer(vertexWithRankAndDanglingSerializer, 0);
@@ -273,7 +273,7 @@ public class CustomCompensatableDanglingPageRank {
 		
 		// --------------- the auxiliaries ---------------------
 
-		AbstractJobVertex sync = JobGraphUtils.createSync(jobGraph, degreeOfParallelism);
+		AbstractJobVertex sync = JobGraphUtils.createSync(jobGraph, parallelism);
 		TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
 		syncConfig.setNumberOfIterations(numIterations);
 		syncConfig.addIterationAggregator(CustomCompensatableDotProductCoGroup.AGGREGATOR_NAME, new PageRankStatsAggregator());
@@ -292,7 +292,7 @@ public class CustomCompensatableDanglingPageRank {
 		JobGraphUtils.connect(head, tail, DistributionPattern.POINTWISE);
 		JobGraphUtils.connect(intermediate, tail, DistributionPattern.ALL_TO_ALL);
 		tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
-		tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(1, degreeOfParallelism);
+		tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(1, parallelism);
 
 		JobGraphUtils.connect(head, output, DistributionPattern.POINTWISE);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java
index 7bc300f..6f19c03 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java
@@ -97,7 +97,7 @@ public class CustomCompensatableDanglingPageRankWithCombiner {
 		
 	public static JobGraph getJobGraph(String[] args) throws Exception {
 
-		int degreeOfParallelism = 2;
+		int parallelism = 2;
 		String pageWithRankInputPath = ""; //"file://" + PlayConstants.PLAY_DIR + "test-inputs/danglingpagerank/pageWithRank";
 		String adjacencyListInputPath = ""; //"file://" + PlayConstants.PLAY_DIR +
 //			"test-inputs/danglingpagerank/adjacencylists";
@@ -114,7 +114,7 @@ public class CustomCompensatableDanglingPageRankWithCombiner {
 		double messageLoss = 0.75;
 
 		if (args.length >= 14) {
-			degreeOfParallelism = Integer.parseInt(args[0]);
+			parallelism = Integer.parseInt(args[0]);
 			pageWithRankInputPath = args[1];
 			adjacencyListInputPath = args[2];
 			outputPath = args[3];
@@ -138,7 +138,7 @@ public class CustomCompensatableDanglingPageRankWithCombiner {
 
 		// page rank input
 		InputFormatVertex pageWithRankInput = JobGraphUtils.createInput(new CustomImprovedDanglingPageRankInputFormat(),
-			pageWithRankInputPath, "DanglingPageWithRankInput", jobGraph, degreeOfParallelism);
+			pageWithRankInputPath, "DanglingPageWithRankInput", jobGraph, parallelism);
 		TaskConfig pageWithRankInputConfig = new TaskConfig(pageWithRankInput.getConfiguration());
 		pageWithRankInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
 		pageWithRankInputConfig.setOutputComparator(vertexWithRankAndDanglingComparator, 0);
@@ -147,7 +147,7 @@ public class CustomCompensatableDanglingPageRankWithCombiner {
 
 		// edges as adjacency list
 		InputFormatVertex adjacencyListInput = JobGraphUtils.createInput(new CustomImprovedAdjacencyListInputFormat(),
-			adjacencyListInputPath, "AdjancencyListInput", jobGraph, degreeOfParallelism);
+			adjacencyListInputPath, "AdjancencyListInput", jobGraph, parallelism);
 		TaskConfig adjacencyListInputConfig = new TaskConfig(adjacencyListInput.getConfiguration());
 		adjacencyListInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
 		adjacencyListInputConfig.setOutputSerializer(vertexWithAdjacencyListSerializer);
@@ -155,7 +155,7 @@ public class CustomCompensatableDanglingPageRankWithCombiner {
 
 		// --------------- the head ---------------------
 		AbstractJobVertex head = JobGraphUtils.createTask(IterationHeadPactTask.class, "IterationHead", jobGraph,
-			degreeOfParallelism);
+			parallelism);
 		TaskConfig headConfig = new TaskConfig(head.getConfiguration());
 		headConfig.setIterationId(ITERATION_ID);
 		
@@ -200,7 +200,7 @@ public class CustomCompensatableDanglingPageRankWithCombiner {
 		// --------------- the join ---------------------
 		
 		AbstractJobVertex intermediate = JobGraphUtils.createTask(IterationIntermediatePactTask.class,
-			"IterationIntermediate", jobGraph, degreeOfParallelism);
+			"IterationIntermediate", jobGraph, parallelism);
 		TaskConfig intermediateConfig = new TaskConfig(intermediate.getConfiguration());
 		intermediateConfig.setIterationId(ITERATION_ID);
 //		intermediateConfig.setDriver(RepeatableHashjoinMatchDriverWithCachedBuildside.class);
@@ -241,7 +241,7 @@ public class CustomCompensatableDanglingPageRankWithCombiner {
 		// ---------------- the tail (co group) --------------------
 		
 		AbstractJobVertex tail = JobGraphUtils.createTask(IterationTailPactTask.class, "IterationTail", jobGraph,
-			degreeOfParallelism);
+			parallelism);
 		TaskConfig tailConfig = new TaskConfig(tail.getConfiguration());
 		tailConfig.setIterationId(ITERATION_ID);
 		tailConfig.setIsWorksetUpdate();
@@ -278,7 +278,7 @@ public class CustomCompensatableDanglingPageRankWithCombiner {
 		
 		// --------------- the output ---------------------
 
-		OutputFormatVertex output = JobGraphUtils.createFileOutput(jobGraph, "FinalOutput", degreeOfParallelism);
+		OutputFormatVertex output = JobGraphUtils.createFileOutput(jobGraph, "FinalOutput", parallelism);
 		TaskConfig outputConfig = new TaskConfig(output.getConfiguration());
 		outputConfig.addInputToGroup(0);
 		outputConfig.setInputSerializer(vertexWithRankAndDanglingSerializer, 0);
@@ -287,7 +287,7 @@ public class CustomCompensatableDanglingPageRankWithCombiner {
 		
 		// --------------- the auxiliaries ---------------------
 
-		AbstractJobVertex sync = JobGraphUtils.createSync(jobGraph, degreeOfParallelism);
+		AbstractJobVertex sync = JobGraphUtils.createSync(jobGraph, parallelism);
 		TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
 		syncConfig.setNumberOfIterations(numIterations);
 		syncConfig.addIterationAggregator(CustomCompensatableDotProductCoGroup.AGGREGATOR_NAME, new PageRankStatsAggregator());
@@ -306,7 +306,7 @@ public class CustomCompensatableDanglingPageRankWithCombiner {
 		JobGraphUtils.connect(head, tail, DistributionPattern.POINTWISE);
 		JobGraphUtils.connect(intermediate, tail, DistributionPattern.ALL_TO_ALL);
 		tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
-		tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(1, degreeOfParallelism);
+		tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(1, parallelism);
 
 		JobGraphUtils.connect(head, output, DistributionPattern.POINTWISE);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java
index f06f723..8216ccb 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java
@@ -77,7 +77,7 @@ public class CompensatableDanglingPageRank {
 		
 	public static JobGraph getJobGraph(String[] args) throws Exception {
 
-		int degreeOfParallelism = 2;
+		int parallelism = 2;
 		String pageWithRankInputPath = ""; // "file://" + PlayConstants.PLAY_DIR + "test-inputs/danglingpagerank/pageWithRank";
 		String adjacencyListInputPath = ""; // "file://" + PlayConstants.PLAY_DIR +
 //			"test-inputs/danglingpagerank/adjacencylists";
@@ -95,7 +95,7 @@ public class CompensatableDanglingPageRank {
 		double messageLoss = 0.75;
 
 		if (args.length >= 15) {
-			degreeOfParallelism = Integer.parseInt(args[0]);
+			parallelism = Integer.parseInt(args[0]);
 			pageWithRankInputPath = args[1];
 			adjacencyListInputPath = args[2];
 			outputPath = args[3];
@@ -119,7 +119,7 @@ public class CompensatableDanglingPageRank {
 
 		// page rank input
 		InputFormatVertex pageWithRankInput = JobGraphUtils.createInput(new ImprovedDanglingPageRankInputFormat(),
-			pageWithRankInputPath, "DanglingPageWithRankInput", jobGraph, degreeOfParallelism);
+			pageWithRankInputPath, "DanglingPageWithRankInput", jobGraph, parallelism);
 		TaskConfig pageWithRankInputConfig = new TaskConfig(pageWithRankInput.getConfiguration());
 		pageWithRankInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
 		pageWithRankInputConfig.setOutputComparator(fieldZeroComparator, 0);
@@ -128,14 +128,14 @@ public class CompensatableDanglingPageRank {
 
 		// edges as adjacency list
 		InputFormatVertex adjacencyListInput = JobGraphUtils.createInput(new ImprovedAdjacencyListInputFormat(),
-			adjacencyListInputPath, "AdjancencyListInput", jobGraph, degreeOfParallelism);
+			adjacencyListInputPath, "AdjancencyListInput", jobGraph, parallelism);
 		TaskConfig adjacencyListInputConfig = new TaskConfig(adjacencyListInput.getConfiguration());
 		adjacencyListInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
 		adjacencyListInputConfig.setOutputSerializer(recSerializer);
 		adjacencyListInputConfig.setOutputComparator(fieldZeroComparator, 0);
 
 		// --------------- the head ---------------------
-		AbstractJobVertex head = JobGraphUtils.createTask(IterationHeadPactTask.class, "IterationHead", jobGraph, degreeOfParallelism);
+		AbstractJobVertex head = JobGraphUtils.createTask(IterationHeadPactTask.class, "IterationHead", jobGraph, parallelism);
 		TaskConfig headConfig = new TaskConfig(head.getConfiguration());
 		headConfig.setIterationId(ITERATION_ID);
 		
@@ -179,7 +179,7 @@ public class CompensatableDanglingPageRank {
 
 		// --------------- the join ---------------------
 		
-		AbstractJobVertex intermediate = JobGraphUtils.createTask(IterationIntermediatePactTask.class, "IterationIntermediate", jobGraph, degreeOfParallelism);
+		AbstractJobVertex intermediate = JobGraphUtils.createTask(IterationIntermediatePactTask.class, "IterationIntermediate", jobGraph, parallelism);
 		TaskConfig intermediateConfig = new TaskConfig(intermediate.getConfiguration());
 		intermediateConfig.setIterationId(ITERATION_ID);
 //		intermediateConfig.setDriver(RepeatableHashjoinMatchDriverWithCachedBuildside.class);
@@ -207,7 +207,7 @@ public class CompensatableDanglingPageRank {
 		// ---------------- the tail (co group) --------------------
 		
 		AbstractJobVertex tail = JobGraphUtils.createTask(IterationTailPactTask.class, "IterationTail", jobGraph,
-			degreeOfParallelism);
+			parallelism);
 		TaskConfig tailConfig = new TaskConfig(tail.getConfiguration());
 		tailConfig.setIterationId(ITERATION_ID);
 		tailConfig.setIsWorksetUpdate();
@@ -244,7 +244,7 @@ public class CompensatableDanglingPageRank {
 		
 		// --------------- the output ---------------------
 
-		OutputFormatVertex output = JobGraphUtils.createFileOutput(jobGraph, "FinalOutput", degreeOfParallelism);
+		OutputFormatVertex output = JobGraphUtils.createFileOutput(jobGraph, "FinalOutput", parallelism);
 		TaskConfig outputConfig = new TaskConfig(output.getConfiguration());
 		outputConfig.addInputToGroup(0);
 		outputConfig.setInputSerializer(recSerializer, 0);
@@ -253,7 +253,7 @@ public class CompensatableDanglingPageRank {
 		
 		// --------------- the auxiliaries ---------------------
 
-		AbstractJobVertex sync = JobGraphUtils.createSync(jobGraph, degreeOfParallelism);
+		AbstractJobVertex sync = JobGraphUtils.createSync(jobGraph, parallelism);
 		TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
 		syncConfig.setNumberOfIterations(numIterations);
 		syncConfig.addIterationAggregator(CompensatableDotProductCoGroup.AGGREGATOR_NAME, new PageRankStatsAggregator());
@@ -272,7 +272,7 @@ public class CompensatableDanglingPageRank {
 		JobGraphUtils.connect(head, tail, DistributionPattern.POINTWISE);
 		JobGraphUtils.connect(intermediate, tail, DistributionPattern.ALL_TO_ALL);
 		tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
-		tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(1, degreeOfParallelism);
+		tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(1, parallelism);
 
 		JobGraphUtils.connect(head, output, DistributionPattern.POINTWISE);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSinkITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSinkITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSinkITCase.java
index dbf4798..5dc3867 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSinkITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSinkITCase.java
@@ -61,7 +61,7 @@ public class DataSinkITCase extends MultipleProgramsTestBase {
 	}
 
 	@Test
-	public void testIntSortingDOP1() throws Exception {
+	public void testIntSortingParallelism1() throws Exception {
 
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
@@ -76,7 +76,7 @@ public class DataSinkITCase extends MultipleProgramsTestBase {
 	}
 
 	@Test
-	public void testStringSortingDOP1() throws Exception {
+	public void testStringSortingParallelism1() throws Exception {
 
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
@@ -98,7 +98,7 @@ public class DataSinkITCase extends MultipleProgramsTestBase {
 	}
 
 	@Test
-	public void testTupleSortingSingleAscDOP1() throws Exception {
+	public void testTupleSortingSingleAscParallelism1() throws Exception {
 
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
@@ -134,7 +134,7 @@ public class DataSinkITCase extends MultipleProgramsTestBase {
 	}
 
 	@Test
-	public void testTupleSortingSingleDescDOP1() throws Exception {
+	public void testTupleSortingSingleDescParallelism1() throws Exception {
 
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
@@ -169,7 +169,7 @@ public class DataSinkITCase extends MultipleProgramsTestBase {
 	}
 
 	@Test
-	public void testTupleSortingDualDOP1() throws Exception {
+	public void testTupleSortingDualParallelism1() throws Exception {
 
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
@@ -204,7 +204,7 @@ public class DataSinkITCase extends MultipleProgramsTestBase {
 	}
 
 	@Test
-	public void testTupleSortingNestedDOP1() throws Exception {
+	public void testTupleSortingNestedParallelism1() throws Exception {
 
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
@@ -230,7 +230,7 @@ public class DataSinkITCase extends MultipleProgramsTestBase {
 	}
 
 	@Test
-	public void testTupleSortingNestedDOP1_2() throws Exception {
+	public void testTupleSortingNestedParallelism1_2() throws Exception {
 
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
@@ -256,7 +256,7 @@ public class DataSinkITCase extends MultipleProgramsTestBase {
 	}
 
 	@Test
-	public void testPojoSortingSingleDOP1() throws Exception {
+	public void testPojoSortingSingleParallelism1() throws Exception {
 
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
@@ -279,7 +279,7 @@ public class DataSinkITCase extends MultipleProgramsTestBase {
 	}
 
 	@Test
-	public void testPojoSortingDualDOP1() throws Exception {
+	public void testPojoSortingDualParallelism1() throws Exception {
 
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
@@ -306,7 +306,7 @@ public class DataSinkITCase extends MultipleProgramsTestBase {
 	}
 
 	@Test
-	public void testPojoSortingNestedDOP1() throws Exception {
+	public void testPojoSortingNestedParallelism1() throws Exception {
 
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
@@ -333,7 +333,7 @@ public class DataSinkITCase extends MultipleProgramsTestBase {
 	}
 
 	@Test
-	public void testSortingDOP4() throws Exception {
+	public void testSortingParallelism4() throws Exception {
 
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupCombineITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupCombineITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupCombineITCase.java
index 494f354..2a97c60 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupCombineITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupCombineITCase.java
@@ -321,12 +321,12 @@ public class GroupCombineITCase extends MultipleProgramsTestBase {
 	}
 
 	@Test
-	// check if dop 1 results in the same data like a shuffle
+	// check if parallelism of 1 results in the same data like a shuffle
 	public void testCheckPartitionShuffleDOP1() throws Exception {
 
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-		env.setDegreeOfParallelism(1);
+		env.setParallelism(1);
 
 		// data
 		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
index e5f91b4..c5067f9 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
@@ -138,7 +138,7 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 		 */
 
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.setDegreeOfParallelism(1);
+		env.setParallelism(1);
 
 		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
 		DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
@@ -346,7 +346,7 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 		org.junit.Assume.assumeTrue(mode != TestExecutionMode.COLLECTION);
 
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.setDegreeOfParallelism(2); // important because it determines how often the combiner is called
+		env.setParallelism(2); // important because it determines how often the combiner is called
 
 		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
 		DataSet<Tuple2<Integer, String>> reduceDs = ds.
@@ -394,7 +394,7 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 		 * check correctness of groupReduce with descending group sort
 		 */
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.setDegreeOfParallelism(1);
+		env.setParallelism(1);
 
 		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
 		DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
@@ -456,7 +456,7 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 	 	 */
 
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.setDegreeOfParallelism(1);
+		env.setParallelism(1);
 
 		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
 		DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
@@ -590,7 +590,7 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 		 * check correctness of groupReduce with descending group sort
 		 */
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.setDegreeOfParallelism(1);
+		env.setParallelism(1);
 
 		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
 		DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
@@ -613,7 +613,7 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 		 * Test int-based definition on group sort, for (full) nested Tuple
 		 */
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.setDegreeOfParallelism(1);
+		env.setParallelism(1);
 
 		DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds = CollectionDataSets.getGroupSortedNestedTupleDataSet(env);
 		DataSet<String> reduceDs = ds.groupBy("f1").sortGroup(0, Order.DESCENDING).reduceGroup(new NestedTupleReducer());
@@ -631,7 +631,7 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 		 * Test int-based definition on group sort, for (partial) nested Tuple ASC
 		 */
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.setDegreeOfParallelism(1);
+		env.setParallelism(1);
 
 		DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds = CollectionDataSets.getGroupSortedNestedTupleDataSet(env);
 		// f0.f0 is first integer
@@ -653,7 +653,7 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 		 * Test string-based definition on group sort, for (partial) nested Tuple DESC
 		 */
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.setDegreeOfParallelism(1);
+		env.setParallelism(1);
 
 		DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds = CollectionDataSets.getGroupSortedNestedTupleDataSet(env);
 		// f0.f0 is first integer
@@ -672,7 +672,7 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 		 * Test string-based definition on group sort, for two grouping keys
 		 */
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.setDegreeOfParallelism(1);
+		env.setParallelism(1);
 
 		DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds = CollectionDataSets.getGroupSortedNestedTupleDataSet(env);
 		// f0.f0 is first integer
@@ -691,7 +691,7 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 		 * Test string-based definition on group sort, for two grouping keys with Pojos
 		 */
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.setDegreeOfParallelism(1);
+		env.setParallelism(1);
 
 		DataSet<PojoContainingTupleAndWritable> ds = CollectionDataSets.getGroupSortedPojoContainingTupleAndWritable(env);
 		// f0.f0 is first integer
@@ -711,7 +711,7 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 		 */
 
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.setDegreeOfParallelism(1);
+		env.setParallelism(1);
 
 		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
 		DataSet<Tuple3<Integer, Long, String>> reduceDs = ds
@@ -830,7 +830,7 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 		 */
 
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.setDegreeOfParallelism(1);
+		env.setParallelism(1);
 
 		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
 		DataSet<Tuple2<Integer, String>> reduceDs = ds.
@@ -870,7 +870,7 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 		 */
 
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.setDegreeOfParallelism(1);
+		env.setParallelism(1);
 
 		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
 		DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs = ds
@@ -915,7 +915,7 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 		 * Test grouping with pojo containing multiple pojos (was a bug)
 		 */
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.setDegreeOfParallelism(1);
+		env.setParallelism(1);
 
 		DataSet<CollectionDataSets.PojoWithMultiplePojos> ds = CollectionDataSets.getPojoWithMultiplePojos(env);
 
@@ -947,7 +947,7 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 		 * Test Java collections within pojos ( == test kryo)
 		 */
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.setDegreeOfParallelism(1);
+		env.setParallelism(1);
 
 		DataSet<CollectionDataSets.PojoWithCollection> ds = CollectionDataSets.getPojoWithCollection(env);
 		// f0.f0 is first integer
@@ -982,7 +982,7 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 		 * Group by generic type
 		 */
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.setDegreeOfParallelism(1);
+		env.setParallelism(1);
 
 		DataSet<CollectionDataSets.PojoWithCollection> ds = CollectionDataSets.getPojoWithCollection(env);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
index 375baee..0080fb1 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
@@ -527,7 +527,7 @@ public class JoinITCase extends MultipleProgramsTestBase {
 				ds1.join(ds2).where("nestedPojo.longNumber", "number", "str").equalTo("f6","f0","f1");
 
 		joinDs.writeAsCsv(resultPath);
-		env.setDegreeOfParallelism(1);
+		env.setParallelism(1);
 		env.execute();
 
 		expected = "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One,10000)\n" +
@@ -548,7 +548,7 @@ public class JoinITCase extends MultipleProgramsTestBase {
 				ds1.join(ds2).where("nestedPojo.longNumber", "number","nestedTupleWithCustom.f0").equalTo("f6","f0","f2");
 
 		joinDs.writeAsCsv(resultPath);
-		env.setDegreeOfParallelism(1);
+		env.setParallelism(1);
 		env.execute();
 
 		expected = "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One,10000)\n" +
@@ -569,7 +569,7 @@ public class JoinITCase extends MultipleProgramsTestBase {
 				ds1.join(ds2).where("nestedTupleWithCustom.f0","nestedTupleWithCustom.f1.myInt","nestedTupleWithCustom.f1.myLong").equalTo("f2","f3","f4");
 
 		joinDs.writeAsCsv(resultPath);
-		env.setDegreeOfParallelism(1);
+		env.setParallelism(1);
 		env.execute();
 
 		expected = "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One,10000)\n" +
@@ -590,7 +590,7 @@ public class JoinITCase extends MultipleProgramsTestBase {
 				ds1.join(ds2).where(0).equalTo("f0.f0", "f0.f1"); // key is now Tuple2<Integer, Integer>
 
 		joinDs.writeAsCsv(resultPath);
-		env.setDegreeOfParallelism(1);
+		env.setParallelism(1);
 		env.execute();
 
 		expected = "((1,1),one),((1,1),one)\n" +
@@ -612,7 +612,7 @@ public class JoinITCase extends MultipleProgramsTestBase {
 				ds1.join(ds2).where("f0.f0").equalTo("f0.f0"); // key is now Integer from Tuple2<Integer, Integer>
 
 		joinDs.writeAsCsv(resultPath);
-		env.setDegreeOfParallelism(1);
+		env.setParallelism(1);
 		env.execute();
 
 		expected = "((1,1),one),((1,1),one)\n" +
@@ -633,7 +633,7 @@ public class JoinITCase extends MultipleProgramsTestBase {
 				ds1.join(ds2).where("*").equalTo("*");
 
 		joinDs.writeAsCsv(resultPath);
-		env.setDegreeOfParallelism(1);
+		env.setParallelism(1);
 		env.execute();
 
 		expected = "1 First (10,100,1000,One) 10000,(10000,10,100,1000,One,1,First)\n"+
@@ -655,7 +655,7 @@ public class JoinITCase extends MultipleProgramsTestBase {
 				ds2.join(ds2).where("f1.f0").equalTo("f0.f0");
 
 		joinDs.writeAsCsv(resultPath);
-		env.setDegreeOfParallelism(1);
+		env.setParallelism(1);
 		env.execute();
 
 		expected = "((1,1,Hi),(1,1,Hi)),((1,1,Hi),(1,1,Hi))\n" +

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java
index e1603ca..3637680 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java
@@ -148,8 +148,8 @@ public class PartitionITCase extends MultipleProgramsTestBase {
 		env.execute();
 
 		StringBuilder result = new StringBuilder();
-		int numPerPartition = 2220 / env.getDegreeOfParallelism() / 10;
-		for (int i = 0; i < env.getDegreeOfParallelism(); i++) {
+		int numPerPartition = 2220 / env.getParallelism() / 10;
+		for (int i = 0; i < env.getParallelism(); i++) {
 			result.append('(').append(i).append(',').append(numPerPartition).append(")\n");
 		}
 
@@ -190,13 +190,13 @@ public class PartitionITCase extends MultipleProgramsTestBase {
 	}
 
 	@Test
-	public void testHashPartitionByKeyFieldAndDifferentDOP() throws Exception {
+	public void testHashPartitionByKeyFieldAndDifferentParallelism() throws Exception {
 		/*
-		 * Test hash partition by key field and different DOP
+		 * Test hash partition by key field and different parallelism
 		 */
 
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.setDegreeOfParallelism(3);
+		env.setParallelism(3);
 
 		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
 		DataSet<Long> uniqLongs = ds
@@ -221,7 +221,7 @@ public class PartitionITCase extends MultipleProgramsTestBase {
 		 */
 
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.setDegreeOfParallelism(3);
+		env.setParallelism(3);
 
 		DataSet<POJO> ds = CollectionDataSets.getDuplicatePojoDataSet(env);
 		DataSet<Long> uniqLongs = ds

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SortPartitionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SortPartitionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SortPartitionITCase.java
index 4bba558..d961f3a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SortPartitionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SortPartitionITCase.java
@@ -72,7 +72,7 @@ public class SortPartitionITCase extends MultipleProgramsTestBase {
 		 */
 
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.setDegreeOfParallelism(4);
+		env.setParallelism(4);
 
 		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
 		ds
@@ -94,7 +94,7 @@ public class SortPartitionITCase extends MultipleProgramsTestBase {
 		 */
 
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.setDegreeOfParallelism(2);
+		env.setParallelism(2);
 
 		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
 		ds
@@ -117,7 +117,7 @@ public class SortPartitionITCase extends MultipleProgramsTestBase {
 		 */
 
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.setDegreeOfParallelism(4);
+		env.setParallelism(4);
 
 		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
 		ds
@@ -139,7 +139,7 @@ public class SortPartitionITCase extends MultipleProgramsTestBase {
 		 */
 
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.setDegreeOfParallelism(2);
+		env.setParallelism(2);
 
 		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
 		ds
@@ -162,7 +162,7 @@ public class SortPartitionITCase extends MultipleProgramsTestBase {
 		 */
 
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.setDegreeOfParallelism(3);
+		env.setParallelism(3);
 
 		DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds = CollectionDataSets.getGroupSortedNestedTupleDataSet(env);
 		ds
@@ -185,7 +185,7 @@ public class SortPartitionITCase extends MultipleProgramsTestBase {
 		 */
 
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.setDegreeOfParallelism(3);
+		env.setParallelism(3);
 
 		DataSet<POJO> ds = CollectionDataSets.getMixedPojoDataSet(env);
 		ds
@@ -202,17 +202,17 @@ public class SortPartitionITCase extends MultipleProgramsTestBase {
 	}
 
 	@Test
-	public void testSortPartitionDOPChange() throws Exception {
+	public void testSortPartitionParallelismChange() throws Exception {
 		/*
-		 * Test sort partition with DOP change
+		 * Test sort partition with parallelism change
 		 */
 
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.setDegreeOfParallelism(3);
+		env.setParallelism(3);
 
 		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
 		ds
-				.sortPartition(1, Order.DESCENDING).setParallelism(3) // change DOP
+				.sortPartition(1, Order.DESCENDING).setParallelism(3) // change parallelism
 				.mapPartition(new OrderCheckMapper<Tuple3<Integer, Long, String>>(new Tuple3Checker()))
 				.distinct()
 				.writeAsText(resultPath);

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
index 8ddd7bc..aeab77b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
@@ -78,7 +78,7 @@ public class AutoParallelismITCase {
 			ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
 					"localhost", cluster.getJobManagerRPCPort());
 
-			env.setDegreeOfParallelism(ExecutionConfig.PARALLELISM_AUTO_MAX);
+			env.setParallelism(ExecutionConfig.PARALLELISM_AUTO_MAX);
 
 			DataSet<Integer> result = env
 					.createInput(new ParallelismDependentInputFormat())

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/misc/CustomPartitioningITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/CustomPartitioningITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/CustomPartitioningITCase.java
index c308007..39a08d2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/CustomPartitioningITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/CustomPartitioningITCase.java
@@ -34,7 +34,7 @@ public class CustomPartitioningITCase extends JavaProgramTestBase {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
 		if (!isCollectionExecution()) {
-			Assert.assertTrue(env.getDegreeOfParallelism() > 1);
+			Assert.assertTrue(env.getParallelism() > 1);
 		}
 		
 		env.generateSequence(1, 1000)

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/misc/NullValuesITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/NullValuesITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/NullValuesITCase.java
index 2087b63..e0ebadd 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/NullValuesITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/NullValuesITCase.java
@@ -48,7 +48,7 @@ public class NullValuesITCase {
 			ExecutionEnvironment env =
 					ExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getJobManagerRPCPort());
 
-			env.setDegreeOfParallelism(1);
+			env.setParallelism(1);
 
 			DataSet<String> data = env.fromElements("hallo")
 					.map(new MapFunction<String, String>() {

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java
index 220611d..be05186 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java
@@ -146,23 +146,23 @@ public class CoGroupITCase extends RecordAPITestBase {
 		FileDataSource input_left =  new FileDataSource(new CoGroupTestInFormat(), leftInPath);
 		DelimitedInputFormat.configureDelimitedFormat(input_left)
 			.recordDelimiter('\n');
-		input_left.setDegreeOfParallelism(config.getInteger("CoGroupTest#NoSubtasks", 1));
+		input_left.setParallelism(config.getInteger("CoGroupTest#NoSubtasks", 1));
 
 		FileDataSource input_right =  new FileDataSource(new CoGroupTestInFormat(), rightInPath);
 		DelimitedInputFormat.configureDelimitedFormat(input_right)
 			.recordDelimiter('\n');
-		input_right.setDegreeOfParallelism(config.getInteger("CoGroupTest#NoSubtasks", 1));
+		input_right.setParallelism(config.getInteger("CoGroupTest#NoSubtasks", 1));
 
 		CoGroupOperator testCoGrouper = CoGroupOperator.builder(new TestCoGrouper(), StringValue.class, 0, 0)
 			.build();
-		testCoGrouper.setDegreeOfParallelism(config.getInteger("CoGroupTest#NoSubtasks", 1));
+		testCoGrouper.setParallelism(config.getInteger("CoGroupTest#NoSubtasks", 1));
 		testCoGrouper.getParameters().setString(Optimizer.HINT_LOCAL_STRATEGY,
 				config.getString("CoGroupTest#LocalStrategy", ""));
 		testCoGrouper.getParameters().setString(Optimizer.HINT_SHIP_STRATEGY,
 				config.getString("CoGroupTest#ShipStrategy", ""));
 
 		FileDataSink output = new FileDataSink(new CoGroupOutFormat(), resultPath);
-		output.setDegreeOfParallelism(1);
+		output.setParallelism(1);
 
 		output.setInput(testCoGrouper);
 		testCoGrouper.setFirstInput(input_left);

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java
index f6b4127..3fde5a9 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java
@@ -122,16 +122,16 @@ public class CrossITCase extends RecordAPITestBase {
 				new ContractITCaseInputFormat(), leftInPath);
 		DelimitedInputFormat.configureDelimitedFormat(input_left)
 			.recordDelimiter('\n');
-		input_left.setDegreeOfParallelism(config.getInteger("CrossTest#NoSubtasks", 1));
+		input_left.setParallelism(config.getInteger("CrossTest#NoSubtasks", 1));
 
 		FileDataSource input_right = new FileDataSource(
 				new ContractITCaseInputFormat(), rightInPath);
 		DelimitedInputFormat.configureDelimitedFormat(input_right)
 			.recordDelimiter('\n');
-		input_right.setDegreeOfParallelism(config.getInteger("CrossTest#NoSubtasks", 1));
+		input_right.setParallelism(config.getInteger("CrossTest#NoSubtasks", 1));
 
 		CrossOperator testCross = CrossOperator.builder(new TestCross()).build();
-		testCross.setDegreeOfParallelism(config.getInteger("CrossTest#NoSubtasks", 1));
+		testCross.setParallelism(config.getInteger("CrossTest#NoSubtasks", 1));
 		testCross.getParameters().setString(Optimizer.HINT_LOCAL_STRATEGY,
 				config.getString("CrossTest#LocalStrategy", ""));
 		if (config.getString("CrossTest#ShipStrategy", "").equals("BROADCAST_FIRST")) {
@@ -151,7 +151,7 @@ public class CrossITCase extends RecordAPITestBase {
 
 		FileDataSink output = new FileDataSink(
 				new ContractITCaseOutputFormat(), resultPath);
-		output.setDegreeOfParallelism(1);
+		output.setParallelism(1);
 
 		output.setInput(testCross);
 		testCross.setFirstInput(input_left);