You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/06/22 23:47:23 UTC

[02/22] Rework the Taskmanager to a slot based model and remove legacy cloud code

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleScalaPrograms/TransitiveClosureNaiveITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleScalaPrograms/TransitiveClosureNaiveITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleScalaPrograms/TransitiveClosureNaiveITCase.java
index feb5f34..95d75fb 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleScalaPrograms/TransitiveClosureNaiveITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleScalaPrograms/TransitiveClosureNaiveITCase.java
@@ -39,7 +39,7 @@ public class TransitiveClosureNaiveITCase extends RecordAPITestBase {
 	protected Plan getTestJob() {
 		TransitiveClosureNaive transitiveClosureNaive = new TransitiveClosureNaive();
 		// "2" is the number of iterations here
-		return transitiveClosureNaive.getScalaPlan(4, 2, verticesPath, edgesPath, resultPath);
+		return transitiveClosureNaive.getScalaPlan(DOP, 2, verticesPath, edgesPath, resultPath);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleScalaPrograms/WebLogAnalysisITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleScalaPrograms/WebLogAnalysisITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleScalaPrograms/WebLogAnalysisITCase.java
index 2ddef0c..67c5ce1 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleScalaPrograms/WebLogAnalysisITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleScalaPrograms/WebLogAnalysisITCase.java
@@ -21,6 +21,6 @@ public class WebLogAnalysisITCase extends eu.stratosphere.test.recordJobTests.We
 	@Override
 	protected Plan getTestJob() {
 		WebLogAnalysis webLogAnalysis = new WebLogAnalysis();
-		return webLogAnalysis.getScalaPlan(4, docsPath, ranksPath, visitsPath, resultPath);
+		return webLogAnalysis.getScalaPlan(DOP, docsPath, ranksPath, visitsPath, resultPath);
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleScalaPrograms/WordCountITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleScalaPrograms/WordCountITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleScalaPrograms/WordCountITCase.java
index 205828f..42ee31a 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleScalaPrograms/WordCountITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleScalaPrograms/WordCountITCase.java
@@ -22,6 +22,6 @@ public class WordCountITCase extends eu.stratosphere.test.recordJobTests.WordCou
 	@Override
 	protected Plan getTestJob() {
 		WordCount wc = new WordCount();
-		return wc.getScalaPlan(4, textPath, resultPath);
+		return wc.getScalaPlan(DOP, textPath, resultPath);
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleScalaPrograms/WordCountPactValueITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleScalaPrograms/WordCountPactValueITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleScalaPrograms/WordCountPactValueITCase.java
index 4627d48..44b37f6 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleScalaPrograms/WordCountPactValueITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleScalaPrograms/WordCountPactValueITCase.java
@@ -22,6 +22,6 @@ public class WordCountPactValueITCase extends eu.stratosphere.test.recordJobTest
 	@Override
 	protected Plan getTestJob() {
 		WordCountWithUserDefinedType wc = new WordCountWithUserDefinedType();
-		return wc.getScalaPlan(4, textPath, resultPath);
+		return wc.getScalaPlan(DOP, textPath, resultPath);
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleScalaPrograms/WordCountWithCountFunctionITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleScalaPrograms/WordCountWithCountFunctionITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleScalaPrograms/WordCountWithCountFunctionITCase.java
index 12082ac..a8eba29 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleScalaPrograms/WordCountWithCountFunctionITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleScalaPrograms/WordCountWithCountFunctionITCase.java
@@ -20,6 +20,6 @@ public class WordCountWithCountFunctionITCase extends eu.stratosphere.test.recor
 
 	@Override
 	protected Plan getTestJob() {
-		return new WordCountWithCount().getScalaPlan(4, textPath, resultPath);
+		return new WordCountWithCount().getScalaPlan(DOP, textPath, resultPath);
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/failingPrograms/TaskFailureITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/failingPrograms/TaskFailureITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/failingPrograms/TaskFailureITCase.java
index 3f4791e..4f5d38d 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/failingPrograms/TaskFailureITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/failingPrograms/TaskFailureITCase.java
@@ -36,6 +36,8 @@ import eu.stratosphere.util.Collector;
  */
 public class TaskFailureITCase extends FailingTestBase {
 
+	private static final int DOP = 4;
+
 	// input for map tasks
 	private static final String MAP_IN = "1 1\n2 2\n2 8\n4 4\n4 4\n6 6\n7 7\n8 8\n" +
 											"1 1\n2 2\n2 2\n4 4\n4 4\n6 3\n5 9\n8 8\n" +
@@ -47,6 +49,10 @@ public class TaskFailureITCase extends FailingTestBase {
 
 	private String inputPath;
 	private String resultPath;
+
+	public TaskFailureITCase(){
+		setTaskManagerNumSlots(DOP);
+	}
 	
 	@Override
 	protected void preSubmit() throws Exception {
@@ -73,7 +79,7 @@ public class TaskFailureITCase extends FailingTestBase {
 
 		// generate plan
 		Plan plan = new Plan(output);
-		plan.setDefaultParallelism(4);
+		plan.setDefaultParallelism(DOP);
 
 		// optimize and compile plan 
 		PactCompiler pc = new PactCompiler(new DataStatistics());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/CoGroupConnectedComponentsITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/CoGroupConnectedComponentsITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/CoGroupConnectedComponentsITCase.java
index c937435..f8d82ae 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/CoGroupConnectedComponentsITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/CoGroupConnectedComponentsITCase.java
@@ -50,6 +50,10 @@ public class CoGroupConnectedComponentsITCase extends RecordAPITestBase {
 	protected String verticesPath;
 	protected String edgesPath;
 	protected String resultPath;
+
+	public CoGroupConnectedComponentsITCase(){
+		setTaskManagerNumSlots(DOP);
+	}
 	
 	
 	@Override
@@ -61,7 +65,7 @@ public class CoGroupConnectedComponentsITCase extends RecordAPITestBase {
 	
 	@Override
 	protected Plan getTestJob() {
-		return getPlan(4, verticesPath, edgesPath, resultPath, 100);
+		return getPlan(DOP, verticesPath, edgesPath, resultPath, 100);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/ConnectedComponentsITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/ConnectedComponentsITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/ConnectedComponentsITCase.java
index 9c88bb5..53feae6 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/ConnectedComponentsITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/ConnectedComponentsITCase.java
@@ -33,6 +33,10 @@ public class ConnectedComponentsITCase extends RecordAPITestBase {
 	protected String verticesPath;
 	protected String edgesPath;
 	protected String resultPath;
+
+	public ConnectedComponentsITCase(){
+		setTaskManagerNumSlots(DOP);
+	}
 	
 	
 	@Override
@@ -45,7 +49,7 @@ public class ConnectedComponentsITCase extends RecordAPITestBase {
 	@Override
 	protected Plan getTestJob() {
 		WorksetConnectedComponents cc = new WorksetConnectedComponents();
-		return cc.getPlan("4",  verticesPath, edgesPath, resultPath, "100");
+		return cc.getPlan(new Integer(DOP).toString(),  verticesPath, edgesPath, resultPath, "100");
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/ConnectedComponentsWithDeferredUpdateITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/ConnectedComponentsWithDeferredUpdateITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/ConnectedComponentsWithDeferredUpdateITCase.java
index 8de877c..eb2eec0 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/ConnectedComponentsWithDeferredUpdateITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/ConnectedComponentsWithDeferredUpdateITCase.java
@@ -59,6 +59,7 @@ public class ConnectedComponentsWithDeferredUpdateITCase extends RecordAPITestBa
 	
 	public ConnectedComponentsWithDeferredUpdateITCase(Configuration config) {
 		super(config);
+		setTaskManagerNumSlots(DOP);
 	}
 	
 	@Override
@@ -71,7 +72,7 @@ public class ConnectedComponentsWithDeferredUpdateITCase extends RecordAPITestBa
 	@Override
 	protected Plan getTestJob() {
 		boolean extraMapper = config.getBoolean("ExtraMapper", false);
-		return getPlan(4, verticesPath, edgesPath, resultPath, 100, extraMapper);
+		return getPlan(DOP, verticesPath, edgesPath, resultPath, 100, extraMapper);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/ConnectedComponentsWithSolutionSetFirstITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/ConnectedComponentsWithSolutionSetFirstITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/ConnectedComponentsWithSolutionSetFirstITCase.java
index 5c696a2..f040f06 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/ConnectedComponentsWithSolutionSetFirstITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/ConnectedComponentsWithSolutionSetFirstITCase.java
@@ -51,6 +51,10 @@ public class ConnectedComponentsWithSolutionSetFirstITCase extends RecordAPITest
 	protected String verticesPath;
 	protected String edgesPath;
 	protected String resultPath;
+
+	public ConnectedComponentsWithSolutionSetFirstITCase(){
+		setTaskManagerNumSlots(DOP);
+	}
 	
 	
 	@Override
@@ -62,7 +66,8 @@ public class ConnectedComponentsWithSolutionSetFirstITCase extends RecordAPITest
 	
 	@Override
 	protected Plan getTestJob() {
-		return getPlanForWorksetConnectedComponentsWithSolutionSetAsFirstInput(4, verticesPath, edgesPath, resultPath, 100);
+		return getPlanForWorksetConnectedComponentsWithSolutionSetAsFirstInput(DOP, verticesPath, edgesPath,
+				resultPath, 100);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/DanglingPageRankITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/DanglingPageRankITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/DanglingPageRankITCase.java
index e84f21e..5390eed 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/DanglingPageRankITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/DanglingPageRankITCase.java
@@ -35,6 +35,7 @@ public class DanglingPageRankITCase extends RecordAPITestBase {
 	
 	public DanglingPageRankITCase(Configuration config) {
 		super(config);
+		setTaskManagerNumSlots(DOP);
 	}
 	
 	
@@ -63,7 +64,7 @@ public class DanglingPageRankITCase extends RecordAPITestBase {
 	@Parameters
 	public static Collection<Object[]> getConfigurations() {
 		Configuration config1 = new Configuration();
-		config1.setInteger("PageRankITCase#NoSubtasks", 4);
+		config1.setInteger("PageRankITCase#NoSubtasks", DOP);
 		config1.setString("PageRankITCase#NumIterations", "25");
 		return toParameterList(config1);
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/DeltaPageRankITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/DeltaPageRankITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/DeltaPageRankITCase.java
index 8f06929..1eb81ed 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/DeltaPageRankITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/DeltaPageRankITCase.java
@@ -36,6 +36,7 @@ public class DeltaPageRankITCase extends RecordAPITestBase {
 	
 	public DeltaPageRankITCase(Configuration config) {
 		super(config);
+		setTaskManagerNumSlots(DOP);
 	}
 	
 	@Override
@@ -66,7 +67,7 @@ public class DeltaPageRankITCase extends RecordAPITestBase {
 	@Parameters
 	public static Collection<Object[]> getConfigurations() {
 		Configuration config1 = new Configuration();
-		config1.setInteger("NumSubtasks", 4);
+		config1.setInteger("NumSubtasks", DOP);
 		config1.setInteger("NumIterations", 3);
 		return toParameterList(config1);
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/DependencyConnectedComponentsITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/DependencyConnectedComponentsITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/DependencyConnectedComponentsITCase.java
index deda551..e1339e3 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/DependencyConnectedComponentsITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/DependencyConnectedComponentsITCase.java
@@ -28,7 +28,6 @@ import eu.stratosphere.api.java.tuple.Tuple2;
 import eu.stratosphere.test.util.JavaProgramTestBase;
 import eu.stratosphere.util.Collector;
 
-
 /**
  * 
  * Iterative Connected Components test case which recomputes only the elements
@@ -46,6 +45,10 @@ public class DependencyConnectedComponentsITCase extends JavaProgramTestBase {
 	protected static List<Tuple2<Long, Long>> edgesInput = new ArrayList<Tuple2<Long, Long>>();
 	private String resultPath;
 	private String expectedResult;
+
+	public DependencyConnectedComponentsITCase(){
+		setTaskManagerNumSlots(DOP);
+	}
 	
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/IterationTerminationWithTerminationTail.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/IterationTerminationWithTerminationTail.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/IterationTerminationWithTerminationTail.java
index 3c38263..50c1970 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/IterationTerminationWithTerminationTail.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/IterationTerminationWithTerminationTail.java
@@ -40,6 +40,10 @@ public class IterationTerminationWithTerminationTail extends RecordAPITestBase {
 
 	protected String dataPath;
 	protected String resultPath;
+
+	public IterationTerminationWithTerminationTail(){
+		setTaskManagerNumSlots(DOP);
+	}
 	
 	@Override
 	protected void preSubmit() throws Exception {
@@ -54,7 +58,7 @@ public class IterationTerminationWithTerminationTail extends RecordAPITestBase {
 
 	@Override
 	protected Plan getTestJob() {
-		return getTestPlanPlan(4, dataPath, resultPath);
+		return getTestPlanPlan(DOP, dataPath, resultPath);
 	}
 	
 	private static Plan getTestPlanPlan(int numSubTasks, String input, String output) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/IterationTerminationWithTwoTails.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/IterationTerminationWithTwoTails.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/IterationTerminationWithTwoTails.java
index f3c6cbb..cfe0510 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/IterationTerminationWithTwoTails.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/IterationTerminationWithTwoTails.java
@@ -42,6 +42,10 @@ public class IterationTerminationWithTwoTails extends RecordAPITestBase {
 	protected String dataPath;
 	protected String resultPath;
 
+	public IterationTerminationWithTwoTails(){
+		setTaskManagerNumSlots(DOP);
+	}
+
 	@Override
 	protected void preSubmit() throws Exception {
 		dataPath = createTempFile("datapoints.txt", INPUT);
@@ -55,7 +59,7 @@ public class IterationTerminationWithTwoTails extends RecordAPITestBase {
 
 	@Override
 	protected Plan getTestJob() {
-		return getTestPlanPlan(4, dataPath, resultPath);
+		return getTestPlanPlan(DOP, dataPath, resultPath);
 	}
 	
 	private static Plan getTestPlanPlan(int numSubTasks, String input, String output) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/IterationWithAllReducerITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/IterationWithAllReducerITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/IterationWithAllReducerITCase.java
index c7e28e2..7085776 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/IterationWithAllReducerITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/IterationWithAllReducerITCase.java
@@ -40,7 +40,9 @@ public class IterationWithAllReducerITCase extends RecordAPITestBase {
 	protected String dataPath;
 	protected String resultPath;
 
-	
+	public IterationWithAllReducerITCase(){
+		setTaskManagerNumSlots(4);
+	}
 
 	@Override
 	protected void preSubmit() throws Exception {
@@ -55,7 +57,7 @@ public class IterationWithAllReducerITCase extends RecordAPITestBase {
 
 	@Override
 	protected Plan getTestJob() {
-		Plan plan = getTestPlanPlan(4, dataPath, resultPath);
+		Plan plan = getTestPlanPlan(DOP, dataPath, resultPath);
 		return plan;
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/IterationWithChainingITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/IterationWithChainingITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/IterationWithChainingITCase.java
index 55c7a35..dba561d 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/IterationWithChainingITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/IterationWithChainingITCase.java
@@ -46,6 +46,7 @@ public class IterationWithChainingITCase extends RecordAPITestBase {
 
     public IterationWithChainingITCase(Configuration config) {
         super(config);
+		setTaskManagerNumSlots(DOP);
     }
 
     @Override
@@ -69,7 +70,7 @@ public class IterationWithChainingITCase extends RecordAPITestBase {
     @Parameters
     public static Collection<Object[]> getConfigurations() {
         Configuration config1 = new Configuration();
-        config1.setInteger("ChainedMapperITCase#NoSubtasks", 4);
+        config1.setInteger("ChainedMapperITCase#NoSubtasks", DOP);
         return toParameterList(config1);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/IterationWithUnionITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/IterationWithUnionITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/IterationWithUnionITCase.java
index 1272f6e..884dd64 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/IterationWithUnionITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/IterationWithUnionITCase.java
@@ -46,6 +46,7 @@ public class IterationWithUnionITCase extends RecordAPITestBase {
 	
 	public IterationWithUnionITCase(Configuration config) {
 		super(config);
+		setTaskManagerNumSlots(DOP);
 	}
 
 	@Override
@@ -67,7 +68,7 @@ public class IterationWithUnionITCase extends RecordAPITestBase {
 	@Parameters
 	public static Collection<Object[]> getConfigurations() {
 		Configuration config1 = new Configuration();
-		config1.setInteger("IterationWithUnionITCase#NumSubtasks", 4);
+		config1.setInteger("IterationWithUnionITCase#NumSubtasks", DOP);
 
 		return toParameterList(config1);
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/IterativeKMeansITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/IterativeKMeansITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/IterativeKMeansITCase.java
index c8da45f..199f108 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/IterativeKMeansITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/IterativeKMeansITCase.java
@@ -27,6 +27,10 @@ public class IterativeKMeansITCase extends RecordAPITestBase {
 	protected String dataPath;
 	protected String clusterPath;
 	protected String resultPath;
+
+	public IterativeKMeansITCase(){
+		setTaskManagerNumSlots(DOP);
+	}
 	
 	@Override
 	protected void preSubmit() throws Exception {
@@ -38,7 +42,7 @@ public class IterativeKMeansITCase extends RecordAPITestBase {
 	@Override
 	protected Plan getTestJob() {
 		KMeansBroadcast kmi = new KMeansBroadcast();
-		return kmi.getPlan("4", dataPath, clusterPath, resultPath, "20");
+		return kmi.getPlan(String.valueOf(DOP), dataPath, clusterPath, resultPath, "20");
 	}
 
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/KMeansITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/KMeansITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/KMeansITCase.java
index 7085c3c..d67da35 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/KMeansITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/KMeansITCase.java
@@ -27,7 +27,11 @@ public class KMeansITCase extends RecordAPITestBase {
 	protected String dataPath;
 	protected String clusterPath;
 	protected String resultPath;
-	
+
+	public KMeansITCase(){
+		setTaskManagerNumSlots(DOP);
+	}
+
 	@Override
 	protected void preSubmit() throws Exception {
 		dataPath = createTempFile("datapoints.txt", KMeansData.DATAPOINTS);
@@ -38,7 +42,7 @@ public class KMeansITCase extends RecordAPITestBase {
 	@Override
 	protected Plan getTestJob() {
 		KMeansBroadcast kmi = new KMeansBroadcast();
-		return kmi.getPlan("4", dataPath, clusterPath, resultPath, "20");
+		return kmi.getPlan(String.valueOf(DOP), dataPath, clusterPath, resultPath, "20");
 	}
 
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/LineRankITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/LineRankITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/LineRankITCase.java
index f38fb7e..7c68605 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/LineRankITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/LineRankITCase.java
@@ -54,6 +54,7 @@ public class LineRankITCase extends RecordAPITestBase {
 	
 	public LineRankITCase(Configuration config) {
 		super(config);
+		setTaskManagerNumSlots(DOP);
 	}
 	
 	@Override
@@ -68,7 +69,7 @@ public class LineRankITCase extends RecordAPITestBase {
 		LineRank lr = new LineRank();
 		
 		Plan plan = lr.getScalaPlan(
-			config.getInteger("NumSubtasks", 1), 
+			config.getInteger("NumSubtasks", 1),
 			sourcesPath,
 			targetsPath,
 			9,
@@ -79,7 +80,7 @@ public class LineRankITCase extends RecordAPITestBase {
 	@Parameters
 	public static Collection<Object[]> getConfigurations() {
 		Configuration config1 = new Configuration();
-		config1.setInteger("NumSubtasks", 4);
+		config1.setInteger("NumSubtasks", DOP);
 		config1.setInteger("NumIterations", 5);
 		return toParameterList(config1);
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/PageRankITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/PageRankITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/PageRankITCase.java
index 8b12b53..58d8170 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/PageRankITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/PageRankITCase.java
@@ -38,6 +38,7 @@ public class PageRankITCase extends RecordAPITestBase {
 	
 	public PageRankITCase(Configuration config) {
 		super(config);
+		setTaskManagerNumSlots(DOP);
 	}
 	
 	@Override
@@ -64,7 +65,7 @@ public class PageRankITCase extends RecordAPITestBase {
 	@Parameters
 	public static Collection<Object[]> getConfigurations() {
 		Configuration config1 = new Configuration();
-		config1.setInteger("NumSubtasks", 4);
+		config1.setInteger("NumSubtasks", DOP);
 		config1.setString("NumIterations", "5");
 		return toParameterList(config1);
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/ConnectedComponentsNepheleITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
index 7eff1aa..b2b3df2 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
@@ -82,6 +82,10 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
 
 	private static final long MEM_PER_CONSUMER = 3;
 
+	private static final int DOP = 4;
+
+	private static final double MEM_FRAC_PER_CONSUMER = (double)MEM_PER_CONSUMER/TASK_MANAGER_MEMORY_SIZE*DOP;
+
 	protected String verticesPath;
 
 	protected String edgesPath;
@@ -90,6 +94,7 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
 
 	public ConnectedComponentsNepheleITCase(Configuration config) {
 		super(config);
+		setTaskManagerNumSlots(DOP);
 	}
 
 	@Parameters
@@ -118,20 +123,19 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
 
 	@Override
 	protected JobGraph getJobGraph() throws Exception {
-		int dop = 4;
 		int maxIterations = 100;
 
 		int type = config.getInteger("testcase", 0);
 		switch (type) {
 		case 1:
-			return createJobGraphUnifiedTails(verticesPath, edgesPath, resultPath, dop, maxIterations);
+			return createJobGraphUnifiedTails(verticesPath, edgesPath, resultPath, DOP, maxIterations);
 		case 2:
-			return createJobGraphSeparateTails(verticesPath, edgesPath, resultPath, dop, maxIterations);
+			return createJobGraphSeparateTails(verticesPath, edgesPath, resultPath, DOP, maxIterations);
 		case 3:
-			return createJobGraphIntermediateWorksetUpdateAndSolutionSetTail(verticesPath, edgesPath, resultPath, dop,
+			return createJobGraphIntermediateWorksetUpdateAndSolutionSetTail(verticesPath, edgesPath, resultPath, DOP,
 				maxIterations);
 		case 4:
-			return createJobGraphSolutionSetUpdateAndWorksetTail(verticesPath, edgesPath, resultPath, dop,
+			return createJobGraphSolutionSetUpdateAndWorksetTail(verticesPath, edgesPath, resultPath, DOP,
 				maxIterations);
 		default:
 			throw new RuntimeException("Broken test configuration");
@@ -167,7 +171,7 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
 		@SuppressWarnings("unchecked")
 		CsvInputFormat verticesInFormat = new CsvInputFormat(' ', LongValue.class);
 		JobInputVertex verticesInput = JobGraphUtils.createInput(verticesInFormat, verticesPath, "VerticesInput",
-			jobGraph, numSubTasks, numSubTasks);
+			jobGraph, numSubTasks);
 		TaskConfig verticesInputConfig = new TaskConfig(verticesInput.getConfiguration());
 		{
 			verticesInputConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
@@ -199,7 +203,7 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
 		@SuppressWarnings("unchecked")
 		CsvInputFormat edgesInFormat = new CsvInputFormat(' ', LongValue.class, LongValue.class);
 		JobInputVertex edgesInput = JobGraphUtils.createInput(edgesInFormat, edgesPath, "EdgesInput", jobGraph,
-			numSubTasks, numSubTasks);
+			numSubTasks);
 		TaskConfig edgesInputConfig = new TaskConfig(edgesInput.getConfiguration());
 		{
 			edgesInputConfig.setOutputSerializer(serializer);
@@ -216,7 +220,7 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
 			TypePairComparatorFactory<?, ?> pairComparator) {
 
 		JobTaskVertex head = JobGraphUtils.createTask(IterationHeadPactTask.class, "Join With Edges (Iteration Head)",
-			jobGraph, numSubTasks, numSubTasks);
+			jobGraph, numSubTasks);
 		TaskConfig headConfig = new TaskConfig(head.getConfiguration());
 		{
 			headConfig.setIterationId(ITERATION_ID);
@@ -234,7 +238,7 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
 			headConfig.setInputComparator(comparator, 1);
 			headConfig.setInputLocalStrategy(1, LocalStrategy.NONE);
 			headConfig.setInputCached(1, true);
-			headConfig.setInputMaterializationMemory(1, MEM_PER_CONSUMER * JobGraphUtils.MEGABYTE);
+			headConfig.setRelativeInputMaterializationMemory(1, MEM_FRAC_PER_CONSUMER);
 
 			// initial solution set input
 			headConfig.addInputToGroup(2);
@@ -248,8 +252,8 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
 
 			// back channel / iterations
 			headConfig.setIsWorksetIteration();
-			headConfig.setBackChannelMemory(MEM_PER_CONSUMER * JobGraphUtils.MEGABYTE);
-			headConfig.setSolutionSetMemory(MEM_PER_CONSUMER * JobGraphUtils.MEGABYTE);
+			headConfig.setRelativeBackChannelMemory(MEM_FRAC_PER_CONSUMER);
+			headConfig.setRelativeSolutionSetMemory(MEM_FRAC_PER_CONSUMER );
 
 			// output into iteration
 			headConfig.setOutputSerializer(serializer);
@@ -273,7 +277,7 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
 			headConfig.setDriverComparator(comparator, 0);
 			headConfig.setDriverComparator(comparator, 1);
 			headConfig.setDriverPairComparator(pairComparator);
-			headConfig.setMemoryDriver(MEM_PER_CONSUMER * JobGraphUtils.MEGABYTE);
+			headConfig.setRelativeMemoryDriver(MEM_FRAC_PER_CONSUMER);
 
 			headConfig.addIterationAggregator(
 				WorksetEmptyConvergenceCriterion.AGGREGATOR_NAME, new LongSumAggregator());
@@ -288,7 +292,7 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
 
 		// --------------- the intermediate (reduce to min id) ---------------
 		JobTaskVertex intermediate = JobGraphUtils.createTask(IterationIntermediatePactTask.class,
-			"Find Min Component-ID", jobGraph, numSubTasks, numSubTasks);
+			"Find Min Component-ID", jobGraph, numSubTasks);
 		TaskConfig intermediateConfig = new TaskConfig(intermediate.getConfiguration());
 		{
 			intermediateConfig.setIterationId(ITERATION_ID);
@@ -297,7 +301,7 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
 			intermediateConfig.setInputSerializer(serializer, 0);
 			intermediateConfig.setInputComparator(comparator, 0);
 			intermediateConfig.setInputLocalStrategy(0, LocalStrategy.SORT);
-			intermediateConfig.setMemoryInput(0, MEM_PER_CONSUMER * JobGraphUtils.MEGABYTE);
+			intermediateConfig.setRelativeMemoryInput(0, MEM_FRAC_PER_CONSUMER);
 			intermediateConfig.setFilehandlesInput(0, 64);
 			intermediateConfig.setSpillingThresholdInput(0, 0.85f);
 
@@ -316,7 +320,7 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
 
 	private static JobOutputVertex createOutput(JobGraph jobGraph, String resultPath, int numSubTasks,
 			TypeSerializerFactory<?> serializer) {
-		JobOutputVertex output = JobGraphUtils.createFileOutput(jobGraph, "Final Output", numSubTasks, numSubTasks);
+		JobOutputVertex output = JobGraphUtils.createFileOutput(jobGraph, "Final Output", numSubTasks);
 		TaskConfig outputConfig = new TaskConfig(output.getConfiguration());
 		{
 
@@ -341,7 +345,7 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
 
 	private static JobOutputVertex createFakeTail(JobGraph jobGraph, int numSubTasks) {
 		JobOutputVertex fakeTailOutput =
-			JobGraphUtils.createFakeOutput(jobGraph, "FakeTailOutput", numSubTasks, numSubTasks);
+			JobGraphUtils.createFakeOutput(jobGraph, "FakeTailOutput", numSubTasks);
 		return fakeTailOutput;
 	}
 
@@ -389,7 +393,7 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
 
 		// --------------- the tail (solution set join) ---------------
 		JobTaskVertex tail = JobGraphUtils.createTask(IterationTailPactTask.class, "IterationTail", jobGraph,
-			numSubTasks, numSubTasks);
+			numSubTasks);
 		TaskConfig tailConfig = new TaskConfig(tail.getConfiguration());
 		{
 			tailConfig.setIterationId(ITERATION_ID);
@@ -480,7 +484,7 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
 
 		// ------------------ the intermediate (ss join) ----------------------
 		JobTaskVertex ssJoinIntermediate = JobGraphUtils.createTask(IterationIntermediatePactTask.class,
-			"Solution Set Join", jobGraph, numSubTasks, numSubTasks);
+			"Solution Set Join", jobGraph, numSubTasks);
 		TaskConfig ssJoinIntermediateConfig = new TaskConfig(ssJoinIntermediate.getConfiguration());
 		{
 			ssJoinIntermediateConfig.setIterationId(ITERATION_ID);
@@ -509,7 +513,7 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
 
 		// -------------------------- ss tail --------------------------------
 		JobTaskVertex ssTail = JobGraphUtils.createTask(IterationTailPactTask.class, "IterationSolutionSetTail",
-			jobGraph, numSubTasks, numSubTasks);
+			jobGraph, numSubTasks);
 		TaskConfig ssTailConfig = new TaskConfig(ssTail.getConfiguration());
 		{
 			ssTailConfig.setIterationId(ITERATION_ID);
@@ -520,7 +524,7 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
 			ssTailConfig.addInputToGroup(0);
 			ssTailConfig.setInputSerializer(serializer, 0);
 			ssTailConfig.setInputAsynchronouslyMaterialized(0, true);
-			ssTailConfig.setInputMaterializationMemory(0, MEM_PER_CONSUMER * JobGraphUtils.MEGABYTE);
+			ssTailConfig.setRelativeInputMaterializationMemory(0, MEM_FRAC_PER_CONSUMER);
 
 			// output
 			ssTailConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
@@ -534,7 +538,7 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
 
 		// -------------------------- ws tail --------------------------------
 		JobTaskVertex wsTail = JobGraphUtils.createTask(IterationTailPactTask.class, "IterationWorksetTail",
-			jobGraph, numSubTasks, numSubTasks);
+			jobGraph, numSubTasks);
 		TaskConfig wsTailConfig = new TaskConfig(wsTail.getConfiguration());
 		{
 			wsTailConfig.setIterationId(ITERATION_ID);
@@ -631,7 +635,7 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
 		// ------------------ the intermediate (ws update) ----------------------
 		JobTaskVertex wsUpdateIntermediate =
 			JobGraphUtils.createTask(IterationIntermediatePactTask.class, "WorksetUpdate", jobGraph,
-				numSubTasks, numSubTasks);
+				numSubTasks);
 		TaskConfig wsUpdateConfig = new TaskConfig(wsUpdateIntermediate.getConfiguration());
 		{
 			wsUpdateConfig.setIterationId(ITERATION_ID);
@@ -661,7 +665,7 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
 		// -------------------------- ss tail --------------------------------
 		JobTaskVertex ssTail =
 			JobGraphUtils.createTask(IterationTailPactTask.class, "IterationSolutionSetTail", jobGraph,
-				numSubTasks, numSubTasks);
+				numSubTasks);
 		TaskConfig ssTailConfig = new TaskConfig(ssTail.getConfiguration());
 		{
 			ssTailConfig.setIterationId(ITERATION_ID);
@@ -754,7 +758,7 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
 
 		// ------------------ the intermediate (ss update) ----------------------
 		JobTaskVertex ssJoinIntermediate = JobGraphUtils.createTask(IterationIntermediatePactTask.class,
-			"Solution Set Update", jobGraph, numSubTasks, numSubTasks);
+			"Solution Set Update", jobGraph, numSubTasks);
 		TaskConfig ssJoinIntermediateConfig = new TaskConfig(ssJoinIntermediate.getConfiguration());
 		{
 			ssJoinIntermediateConfig.setIterationId(ITERATION_ID);
@@ -782,7 +786,7 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
 
 		// -------------------------- ws tail --------------------------------
 		JobTaskVertex wsTail = JobGraphUtils.createTask(IterationTailPactTask.class, "IterationWorksetTail",
-			jobGraph, numSubTasks, numSubTasks);
+			jobGraph, numSubTasks);
 		TaskConfig wsTailConfig = new TaskConfig(wsTail.getConfiguration());
 		{
 			wsTailConfig.setIterationId(ITERATION_ID);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/DanglingPageRankNepheleITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/DanglingPageRankNepheleITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/DanglingPageRankNepheleITCase.java
index 872cabc..002533a 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/DanglingPageRankNepheleITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/DanglingPageRankNepheleITCase.java
@@ -34,6 +34,10 @@ public class DanglingPageRankNepheleITCase extends RecordAPITestBase {
 	protected String edgesPath;
 	protected String resultPath;
 
+	public DanglingPageRankNepheleITCase(){
+		setTaskManagerNumSlots(DOP);
+	}
+
 	
 	@Override
 	protected void preSubmit() throws Exception {
@@ -45,8 +49,7 @@ public class DanglingPageRankNepheleITCase extends RecordAPITestBase {
 	@Override
 	protected JobGraph getJobGraph() throws Exception {
 		String[] parameters = new String[] {
-			"4",
-			"4",
+			new Integer(DOP).toString(),
 			pagesWithRankPath,
 			edgesPath,
 			resultPath,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/DanglingPageRankWithCombinerNepheleITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/DanglingPageRankWithCombinerNepheleITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/DanglingPageRankWithCombinerNepheleITCase.java
index 59d5206..e42620e 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/DanglingPageRankWithCombinerNepheleITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/DanglingPageRankWithCombinerNepheleITCase.java
@@ -22,6 +22,10 @@ public class DanglingPageRankWithCombinerNepheleITCase extends RecordAPITestBase
 	protected String pagesWithRankPath;
 	protected String edgesPath;
 	protected String resultPath;
+
+	public DanglingPageRankWithCombinerNepheleITCase(){
+		setTaskManagerNumSlots(DOP);
+	}
 	
 	@Override
 	protected void preSubmit() throws Exception {
@@ -33,8 +37,7 @@ public class DanglingPageRankWithCombinerNepheleITCase extends RecordAPITestBase
 	@Override
 	protected JobGraph getJobGraph() throws Exception {
 		String[] parameters = new String[] {
-			"4",
-			"4",
+			new Integer(DOP).toString(),
 			pagesWithRankPath,
 			edgesPath,
 			resultPath,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/IterationWithChainingNepheleITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/IterationWithChainingNepheleITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/IterationWithChainingNepheleITCase.java
index ef7c9d2..8a68402 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/IterationWithChainingNepheleITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/IterationWithChainingNepheleITCase.java
@@ -75,6 +75,7 @@ public class IterationWithChainingNepheleITCase extends RecordAPITestBase {
 
 	public IterationWithChainingNepheleITCase(Configuration config) {
 		super(config);
+		setTaskManagerNumSlots(DOP);
 	}
 
 	@Override
@@ -94,7 +95,7 @@ public class IterationWithChainingNepheleITCase extends RecordAPITestBase {
 	@Parameterized.Parameters
 	public static Collection<Object[]> getConfigurations() {
 		Configuration config = new Configuration();
-		config.setInteger("ChainedMapperNepheleITCase#NoSubtasks", 2);
+		config.setInteger("ChainedMapperNepheleITCase#NoSubtasks", DOP);
 		config.setInteger("ChainedMapperNepheleITCase#MaxIterations", 2);
 		return toParameterList(config);
 	}
@@ -118,8 +119,6 @@ public class IterationWithChainingNepheleITCase extends RecordAPITestBase {
 		final TypeComparatorFactory<Record> comparator =
 			new RecordComparatorFactory(new int[] { 0 }, new Class[] { IntValue.class });
 
-		final long MEM_PER_CONSUMER = 2;
-
 		final int ITERATION_ID = 1;
 
 		// --------------------------------------------------------------------------------------------------------------
@@ -128,7 +127,7 @@ public class IterationWithChainingNepheleITCase extends RecordAPITestBase {
 
 		// - input -----------------------------------------------------------------------------------------------------
 		JobInputVertex input = JobGraphUtils.createInput(
-			new PointInFormat(), inputPath, "Input", jobGraph, numSubTasks, numSubTasks);
+			new PointInFormat(), inputPath, "Input", jobGraph, numSubTasks);
 		TaskConfig inputConfig = new TaskConfig(input.getConfiguration());
 		{
 			inputConfig.setOutputSerializer(serializer);
@@ -137,7 +136,7 @@ public class IterationWithChainingNepheleITCase extends RecordAPITestBase {
 
 		// - head ------------------------------------------------------------------------------------------------------
 		JobTaskVertex head = JobGraphUtils.createTask(
-			IterationHeadPactTask.class, "Iteration Head", jobGraph, numSubTasks, numSubTasks);
+			IterationHeadPactTask.class, "Iteration Head", jobGraph, numSubTasks);
 		TaskConfig headConfig = new TaskConfig(head.getConfiguration());
 		{
 			headConfig.setIterationId(ITERATION_ID);
@@ -168,12 +167,12 @@ public class IterationWithChainingNepheleITCase extends RecordAPITestBase {
 			headConfig.setStubWrapper(new UserCodeClassWrapper<DummyMapper>(DummyMapper.class));
 
 			// back channel
-			headConfig.setBackChannelMemory(MEM_PER_CONSUMER * JobGraphUtils.MEGABYTE);
+			headConfig.setRelativeBackChannelMemory(1.0);
 		}
 
 		// - tail ------------------------------------------------------------------------------------------------------
 		JobTaskVertex tail = JobGraphUtils.createTask(
-			IterationTailPactTask.class, "Chained Iteration Tail", jobGraph, numSubTasks, numSubTasks);
+			IterationTailPactTask.class, "Chained Iteration Tail", jobGraph, numSubTasks);
 		TaskConfig tailConfig = new TaskConfig(tail.getConfiguration());
 		{
 			tailConfig.setIterationId(ITERATION_ID);
@@ -210,7 +209,7 @@ public class IterationWithChainingNepheleITCase extends RecordAPITestBase {
 		}
 
 		// - output ----------------------------------------------------------------------------------------------------
-		JobOutputVertex output = JobGraphUtils.createFileOutput(jobGraph, "Output", numSubTasks, numSubTasks);
+		JobOutputVertex output = JobGraphUtils.createFileOutput(jobGraph, "Output", numSubTasks);
 		TaskConfig outputConfig = new TaskConfig(output.getConfiguration());
 		{
 			outputConfig.addInputToGroup(0);
@@ -221,7 +220,7 @@ public class IterationWithChainingNepheleITCase extends RecordAPITestBase {
 		}
 
 		// - fake tail -------------------------------------------------------------------------------------------------
-		JobOutputVertex fakeTail = JobGraphUtils.createFakeOutput(jobGraph, "Fake Tail", numSubTasks, numSubTasks);
+		JobOutputVertex fakeTail = JobGraphUtils.createFakeOutput(jobGraph, "Fake Tail", numSubTasks);
 
 		// - sync ------------------------------------------------------------------------------------------------------
 		JobOutputVertex sync = JobGraphUtils.createSync(jobGraph, numSubTasks);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/JobGraphUtils.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/JobGraphUtils.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/JobGraphUtils.java
index a229086..109c91a 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/JobGraphUtils.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/JobGraphUtils.java
@@ -51,14 +51,14 @@ public class JobGraphUtils {
 	}
 	
 	public static <T extends FileInputFormat<?>> JobInputVertex createInput(T stub, String path, String name, JobGraph graph,
-			int degreeOfParallelism, int numSubTasksPerInstance)
+			int degreeOfParallelism)
 	{
 		stub.setFilePath(path);
-		return createInput(new UserCodeObjectWrapper<T>(stub), name, graph, degreeOfParallelism, numSubTasksPerInstance);
+		return createInput(new UserCodeObjectWrapper<T>(stub), name, graph, degreeOfParallelism);
 	}
 
 	private static <T extends InputFormat<?,?>> JobInputVertex createInput(UserCodeWrapper<T> stub, String name, JobGraph graph,
-			int degreeOfParallelism, int numSubTasksPerInstance)
+			int degreeOfParallelism)
 	{
 		JobInputVertex inputVertex = new JobInputVertex(name, graph);
 		
@@ -67,8 +67,7 @@ public class JobGraphUtils {
 		inputVertex.setInputClass(clazz);
 		
 		inputVertex.setNumberOfSubtasks(degreeOfParallelism);
-		inputVertex.setNumberOfSubtasksPerInstance(numSubTasksPerInstance);
-		
+
 		TaskConfig inputConfig = new TaskConfig(inputVertex.getConfiguration());
 		inputConfig.setStubWrapper(stub);
 		
@@ -89,12 +88,11 @@ public class JobGraphUtils {
 	}
 
 	public static JobTaskVertex createTask(@SuppressWarnings("rawtypes") Class<? extends RegularPactTask> task, String name, JobGraph graph,
-			int degreeOfParallelism, int numSubtasksPerInstance)
+			int degreeOfParallelism)
 	{
 		JobTaskVertex taskVertex = new JobTaskVertex(name, graph);
 		taskVertex.setTaskClass(task);
 		taskVertex.setNumberOfSubtasks(degreeOfParallelism);
-		taskVertex.setNumberOfSubtasksPerInstance(numSubtasksPerInstance);
 		return taskVertex;
 	}
 
@@ -107,23 +105,19 @@ public class JobGraphUtils {
 		return sync;
 	}
 
-	public static JobOutputVertex createFakeOutput(JobGraph jobGraph, String name, int degreeOfParallelism,
-			int numSubTasksPerInstance)
+	public static JobOutputVertex createFakeOutput(JobGraph jobGraph, String name, int degreeOfParallelism)
 	{
 		JobOutputVertex outputVertex = new JobOutputVertex(name, jobGraph);
 		outputVertex.setOutputClass(FakeOutputTask.class);
 		outputVertex.setNumberOfSubtasks(degreeOfParallelism);
-		outputVertex.setNumberOfSubtasksPerInstance(numSubTasksPerInstance);
 		return outputVertex;
 	}
 
-	public static JobOutputVertex createFileOutput(JobGraph jobGraph, String name, int degreeOfParallelism,
-			int numSubTasksPerInstance)
+	public static JobOutputVertex createFileOutput(JobGraph jobGraph, String name, int degreeOfParallelism)
 	{
 		JobOutputVertex sinkVertex = new JobOutputVertex(name, jobGraph);
 		sinkVertex.setOutputClass(DataSinkTask.class);
 		sinkVertex.setNumberOfSubtasks(degreeOfParallelism);
-		sinkVertex.setNumberOfSubtasksPerInstance(numSubTasksPerInstance);
 		return sinkVertex;
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java
index b112741..5b2433e 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java
@@ -92,7 +92,6 @@ public class CustomCompensatableDanglingPageRank {
 	public static JobGraph getJobGraph(String[] args) throws Exception {
 
 		int degreeOfParallelism = 2;
-		int numSubTasksPerInstance = degreeOfParallelism;
 		String pageWithRankInputPath = ""; //"file://" + PlayConstants.PLAY_DIR + "test-inputs/danglingpagerank/pageWithRank";
 		String adjacencyListInputPath = ""; //"file://" + PlayConstants.PLAY_DIR +
 //			"test-inputs/danglingpagerank/adjacencylists";
@@ -109,31 +108,32 @@ public class CustomCompensatableDanglingPageRank {
 		int failingIteration = 2;
 		double messageLoss = 0.75;
 
-		if (args.length >= 15) {
+		if (args.length >= 14) {
 			degreeOfParallelism = Integer.parseInt(args[0]);
-			numSubTasksPerInstance = Integer.parseInt(args[1]);
-			pageWithRankInputPath = args[2];
-			adjacencyListInputPath = args[3];
-			outputPath = args[4];
-//			confPath = args[5];
-			minorConsumer = Integer.parseInt(args[6]);
-			matchMemory = Integer.parseInt(args[7]);
-			coGroupSortMemory = Integer.parseInt(args[8]);
-			numIterations = Integer.parseInt(args[9]);
-			numVertices = Long.parseLong(args[10]);
-			numDanglingVertices = Long.parseLong(args[11]);
-			failingWorkers = args[12];
-			failingIteration = Integer.parseInt(args[13]);
-			messageLoss = Double.parseDouble(args[14]);
+			pageWithRankInputPath = args[1];
+			adjacencyListInputPath = args[2];
+			outputPath = args[3];
+//			confPath = args[4];
+			minorConsumer = Integer.parseInt(args[5]);
+			matchMemory = Integer.parseInt(args[6]);
+			coGroupSortMemory = Integer.parseInt(args[7]);
+			numIterations = Integer.parseInt(args[8]);
+			numVertices = Long.parseLong(args[9]);
+			numDanglingVertices = Long.parseLong(args[10]);
+			failingWorkers = args[11];
+			failingIteration = Integer.parseInt(args[12]);
+			messageLoss = Double.parseDouble(args[13]);
 		}
 
+		int totalMemoryConsumption = 3*minorConsumer + matchMemory + coGroupSortMemory;
+
 		JobGraph jobGraph = new JobGraph("CompensatableDanglingPageRank");
 		
 		// --------------- the inputs ---------------------
 
 		// page rank input
 		JobInputVertex pageWithRankInput = JobGraphUtils.createInput(new CustomImprovedDanglingPageRankInputFormat(),
-			pageWithRankInputPath, "DanglingPageWithRankInput", jobGraph, degreeOfParallelism, numSubTasksPerInstance);
+			pageWithRankInputPath, "DanglingPageWithRankInput", jobGraph, degreeOfParallelism);
 		TaskConfig pageWithRankInputConfig = new TaskConfig(pageWithRankInput.getConfiguration());
 		pageWithRankInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
 		pageWithRankInputConfig.setOutputComparator(vertexWithRankAndDanglingComparator, 0);
@@ -142,7 +142,7 @@ public class CustomCompensatableDanglingPageRank {
 
 		// edges as adjacency list
 		JobInputVertex adjacencyListInput = JobGraphUtils.createInput(new CustomImprovedAdjacencyListInputFormat(),
-			adjacencyListInputPath, "AdjancencyListInput", jobGraph, degreeOfParallelism, numSubTasksPerInstance);
+			adjacencyListInputPath, "AdjancencyListInput", jobGraph, degreeOfParallelism);
 		TaskConfig adjacencyListInputConfig = new TaskConfig(adjacencyListInput.getConfiguration());
 		adjacencyListInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
 		adjacencyListInputConfig.setOutputSerializer(vertexWithAdjacencyListSerializer);
@@ -150,7 +150,7 @@ public class CustomCompensatableDanglingPageRank {
 
 		// --------------- the head ---------------------
 		JobTaskVertex head = JobGraphUtils.createTask(IterationHeadPactTask.class, "IterationHead", jobGraph,
-			degreeOfParallelism, numSubTasksPerInstance);
+			degreeOfParallelism);
 		TaskConfig headConfig = new TaskConfig(head.getConfiguration());
 		headConfig.setIterationId(ITERATION_ID);
 		
@@ -160,12 +160,12 @@ public class CustomCompensatableDanglingPageRank {
 		headConfig.setInputSerializer(vertexWithRankAndDanglingSerializer, 0);
 		headConfig.setInputComparator(vertexWithRankAndDanglingComparator, 0);
 		headConfig.setInputLocalStrategy(0, LocalStrategy.SORT);
-		headConfig.setMemoryInput(0, minorConsumer * JobGraphUtils.MEGABYTE);
+		headConfig.setRelativeMemoryInput(0, (double) minorConsumer / totalMemoryConsumption);
 		headConfig.setFilehandlesInput(0, NUM_FILE_HANDLES_PER_SORT);
 		headConfig.setSpillingThresholdInput(0, SORT_SPILL_THRESHOLD);
 		
 		// back channel / iterations
-		headConfig.setBackChannelMemory(minorConsumer * JobGraphUtils.MEGABYTE);
+		headConfig.setRelativeBackChannelMemory((double) minorConsumer / totalMemoryConsumption);
 		
 		// output into iteration
 		headConfig.setOutputSerializer(vertexWithRankAndDanglingSerializer);
@@ -195,13 +195,13 @@ public class CustomCompensatableDanglingPageRank {
 		// --------------- the join ---------------------
 		
 		JobTaskVertex intermediate = JobGraphUtils.createTask(IterationIntermediatePactTask.class,
-			"IterationIntermediate", jobGraph, degreeOfParallelism, numSubTasksPerInstance);
+			"IterationIntermediate", jobGraph, degreeOfParallelism);
 		TaskConfig intermediateConfig = new TaskConfig(intermediate.getConfiguration());
 		intermediateConfig.setIterationId(ITERATION_ID);
 //		intermediateConfig.setDriver(RepeatableHashjoinMatchDriverWithCachedBuildside.class);
 		intermediateConfig.setDriver(BuildSecondCachedMatchDriver.class);
 		intermediateConfig.setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
-		intermediateConfig.setMemoryDriver(matchMemory * JobGraphUtils.MEGABYTE);
+		intermediateConfig.setRelativeMemoryDriver((double) matchMemory / totalMemoryConsumption);
 		intermediateConfig.addInputToGroup(0);
 		intermediateConfig.addInputToGroup(1);
 		intermediateConfig.setInputSerializer(vertexWithRankAndDanglingSerializer, 0);
@@ -223,7 +223,7 @@ public class CustomCompensatableDanglingPageRank {
 		// ---------------- the tail (co group) --------------------
 		
 		JobTaskVertex tail = JobGraphUtils.createTask(IterationTailPactTask.class, "IterationTail", jobGraph,
-			degreeOfParallelism, numSubTasksPerInstance);
+			degreeOfParallelism);
 		TaskConfig tailConfig = new TaskConfig(tail.getConfiguration());
 		tailConfig.setIterationId(ITERATION_ID);
         tailConfig.setIsWorksetUpdate();
@@ -240,10 +240,10 @@ public class CustomCompensatableDanglingPageRank {
 		tailConfig.setDriverComparator(vertexWithRankComparator, 1);
 		tailConfig.setDriverPairComparator(coGroupComparator);
 		tailConfig.setInputAsynchronouslyMaterialized(0, true);
-		tailConfig.setInputMaterializationMemory(0, minorConsumer * JobGraphUtils.MEGABYTE);
+		tailConfig.setRelativeInputMaterializationMemory(0, (double)minorConsumer/totalMemoryConsumption);
 		tailConfig.setInputLocalStrategy(1, LocalStrategy.SORT);
 		tailConfig.setInputComparator(vertexWithRankComparator, 1);
-		tailConfig.setMemoryInput(1, coGroupSortMemory * JobGraphUtils.MEGABYTE);
+		tailConfig.setRelativeMemoryInput(1, (double) coGroupSortMemory / totalMemoryConsumption);
 		tailConfig.setFilehandlesInput(1, NUM_FILE_HANDLES_PER_SORT);
 		tailConfig.setSpillingThresholdInput(1, SORT_SPILL_THRESHOLD);
 		
@@ -261,8 +261,7 @@ public class CustomCompensatableDanglingPageRank {
 		
 		// --------------- the output ---------------------
 
-		JobOutputVertex output = JobGraphUtils.createFileOutput(jobGraph, "FinalOutput", degreeOfParallelism,
-			numSubTasksPerInstance);
+		JobOutputVertex output = JobGraphUtils.createFileOutput(jobGraph, "FinalOutput", degreeOfParallelism);
 		TaskConfig outputConfig = new TaskConfig(output.getConfiguration());
 		outputConfig.addInputToGroup(0);
 		outputConfig.setInputSerializer(vertexWithRankAndDanglingSerializer, 0);
@@ -272,7 +271,7 @@ public class CustomCompensatableDanglingPageRank {
 		// --------------- the auxiliaries ---------------------
 		
 		JobOutputVertex fakeTailOutput = JobGraphUtils.createFakeOutput(jobGraph, "FakeTailOutput",
-			degreeOfParallelism, numSubTasksPerInstance);
+			degreeOfParallelism);
 
 		JobOutputVertex sync = JobGraphUtils.createSync(jobGraph, degreeOfParallelism);
 		TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java
index 37fab39..698296a 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java
@@ -93,7 +93,6 @@ public class CustomCompensatableDanglingPageRankWithCombiner {
 	public static JobGraph getJobGraph(String[] args) throws Exception {
 
 		int degreeOfParallelism = 2;
-		int numSubTasksPerInstance = degreeOfParallelism;
 		String pageWithRankInputPath = ""; //"file://" + PlayConstants.PLAY_DIR + "test-inputs/danglingpagerank/pageWithRank";
 		String adjacencyListInputPath = ""; //"file://" + PlayConstants.PLAY_DIR +
 //			"test-inputs/danglingpagerank/adjacencylists";
@@ -109,31 +108,32 @@ public class CustomCompensatableDanglingPageRankWithCombiner {
 		int failingIteration = 2;
 		double messageLoss = 0.75;
 
-		if (args.length >= 15) {
+		if (args.length >= 14) {
 			degreeOfParallelism = Integer.parseInt(args[0]);
-			numSubTasksPerInstance = Integer.parseInt(args[1]);
-			pageWithRankInputPath = args[2];
-			adjacencyListInputPath = args[3];
-			outputPath = args[4];
-			// [5] is config path
-			minorConsumer = Integer.parseInt(args[6]);
-			matchMemory = Integer.parseInt(args[7]);
-			coGroupSortMemory = Integer.parseInt(args[8]);
-			numIterations = Integer.parseInt(args[9]);
-			numVertices = Long.parseLong(args[10]);
-			numDanglingVertices = Long.parseLong(args[11]);
-			failingWorkers = args[12];
-			failingIteration = Integer.parseInt(args[13]);
-			messageLoss = Double.parseDouble(args[14]);
+			pageWithRankInputPath = args[1];
+			adjacencyListInputPath = args[2];
+			outputPath = args[3];
+			// [4] is config path
+			minorConsumer = Integer.parseInt(args[5]);
+			matchMemory = Integer.parseInt(args[6]);
+			coGroupSortMemory = Integer.parseInt(args[7]);
+			numIterations = Integer.parseInt(args[8]);
+			numVertices = Long.parseLong(args[9]);
+			numDanglingVertices = Long.parseLong(args[10]);
+			failingWorkers = args[11];
+			failingIteration = Integer.parseInt(args[12]);
+			messageLoss = Double.parseDouble(args[13]);
 		}
 
+		int totalMemoryConsumption = 3*minorConsumer + 2*coGroupSortMemory + matchMemory;
+
 		JobGraph jobGraph = new JobGraph("CompensatableDanglingPageRank");
 		
 		// --------------- the inputs ---------------------
 
 		// page rank input
 		JobInputVertex pageWithRankInput = JobGraphUtils.createInput(new CustomImprovedDanglingPageRankInputFormat(),
-			pageWithRankInputPath, "DanglingPageWithRankInput", jobGraph, degreeOfParallelism, numSubTasksPerInstance);
+			pageWithRankInputPath, "DanglingPageWithRankInput", jobGraph, degreeOfParallelism);
 		TaskConfig pageWithRankInputConfig = new TaskConfig(pageWithRankInput.getConfiguration());
 		pageWithRankInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
 		pageWithRankInputConfig.setOutputComparator(vertexWithRankAndDanglingComparator, 0);
@@ -142,7 +142,7 @@ public class CustomCompensatableDanglingPageRankWithCombiner {
 
 		// edges as adjacency list
 		JobInputVertex adjacencyListInput = JobGraphUtils.createInput(new CustomImprovedAdjacencyListInputFormat(),
-			adjacencyListInputPath, "AdjancencyListInput", jobGraph, degreeOfParallelism, numSubTasksPerInstance);
+			adjacencyListInputPath, "AdjancencyListInput", jobGraph, degreeOfParallelism);
 		TaskConfig adjacencyListInputConfig = new TaskConfig(adjacencyListInput.getConfiguration());
 		adjacencyListInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
 		adjacencyListInputConfig.setOutputSerializer(vertexWithAdjacencyListSerializer);
@@ -150,7 +150,7 @@ public class CustomCompensatableDanglingPageRankWithCombiner {
 
 		// --------------- the head ---------------------
 		JobTaskVertex head = JobGraphUtils.createTask(IterationHeadPactTask.class, "IterationHead", jobGraph,
-			degreeOfParallelism, numSubTasksPerInstance);
+			degreeOfParallelism);
 		TaskConfig headConfig = new TaskConfig(head.getConfiguration());
 		headConfig.setIterationId(ITERATION_ID);
 		
@@ -160,12 +160,12 @@ public class CustomCompensatableDanglingPageRankWithCombiner {
 		headConfig.setInputSerializer(vertexWithRankAndDanglingSerializer, 0);
 		headConfig.setInputComparator(vertexWithRankAndDanglingComparator, 0);
 		headConfig.setInputLocalStrategy(0, LocalStrategy.SORT);
-		headConfig.setMemoryInput(0, minorConsumer * JobGraphUtils.MEGABYTE);
+		headConfig.setRelativeMemoryInput(0, (double)minorConsumer/totalMemoryConsumption);
 		headConfig.setFilehandlesInput(0, NUM_FILE_HANDLES_PER_SORT);
 		headConfig.setSpillingThresholdInput(0, SORT_SPILL_THRESHOLD);
 		
 		// back channel / iterations
-		headConfig.setBackChannelMemory(minorConsumer * JobGraphUtils.MEGABYTE);
+		headConfig.setRelativeBackChannelMemory((double)minorConsumer/totalMemoryConsumption);
 		
 		// output into iteration
 		headConfig.setOutputSerializer(vertexWithRankAndDanglingSerializer);
@@ -195,13 +195,13 @@ public class CustomCompensatableDanglingPageRankWithCombiner {
 		// --------------- the join ---------------------
 		
 		JobTaskVertex intermediate = JobGraphUtils.createTask(IterationIntermediatePactTask.class,
-			"IterationIntermediate", jobGraph, degreeOfParallelism, numSubTasksPerInstance);
+			"IterationIntermediate", jobGraph, degreeOfParallelism);
 		TaskConfig intermediateConfig = new TaskConfig(intermediate.getConfiguration());
 		intermediateConfig.setIterationId(ITERATION_ID);
 //		intermediateConfig.setDriver(RepeatableHashjoinMatchDriverWithCachedBuildside.class);
 		intermediateConfig.setDriver(BuildSecondCachedMatchDriver.class);
 		intermediateConfig.setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
-		intermediateConfig.setMemoryDriver(matchMemory * JobGraphUtils.MEGABYTE);
+		intermediateConfig.setRelativeMemoryDriver((double)matchMemory/totalMemoryConsumption);
 		intermediateConfig.addInputToGroup(0);
 		intermediateConfig.addInputToGroup(1);
 		intermediateConfig.setInputSerializer(vertexWithRankAndDanglingSerializer, 0);
@@ -225,7 +225,7 @@ public class CustomCompensatableDanglingPageRankWithCombiner {
 		combinerConfig.setInputSerializer(vertexWithRankSerializer, 0);
 		combinerConfig.setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE);
 		combinerConfig.setDriverComparator(vertexWithRankComparator, 0);
-		combinerConfig.setMemoryDriver(coGroupSortMemory * JobGraphUtils.MEGABYTE);
+		combinerConfig.setRelativeMemoryDriver((double)coGroupSortMemory/totalMemoryConsumption);
 		combinerConfig.setOutputSerializer(vertexWithRankSerializer);
 		combinerConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
 		combinerConfig.setOutputComparator(vertexWithRankComparator, 0);
@@ -235,7 +235,7 @@ public class CustomCompensatableDanglingPageRankWithCombiner {
 		// ---------------- the tail (co group) --------------------
 		
 		JobTaskVertex tail = JobGraphUtils.createTask(IterationTailPactTask.class, "IterationTail", jobGraph,
-			degreeOfParallelism, numSubTasksPerInstance);
+			degreeOfParallelism);
 		TaskConfig tailConfig = new TaskConfig(tail.getConfiguration());
 		tailConfig.setIterationId(ITERATION_ID);
         tailConfig.setIsWorksetUpdate();
@@ -251,10 +251,10 @@ public class CustomCompensatableDanglingPageRankWithCombiner {
 		tailConfig.setDriverComparator(vertexWithRankComparator, 1);
 		tailConfig.setDriverPairComparator(coGroupComparator);
 		tailConfig.setInputAsynchronouslyMaterialized(0, true);
-		tailConfig.setInputMaterializationMemory(0, minorConsumer * JobGraphUtils.MEGABYTE);
+		tailConfig.setRelativeInputMaterializationMemory(0, (double)minorConsumer/totalMemoryConsumption);
 		tailConfig.setInputLocalStrategy(1, LocalStrategy.SORT);
 		tailConfig.setInputComparator(vertexWithRankComparator, 1);
-		tailConfig.setMemoryInput(1, coGroupSortMemory * JobGraphUtils.MEGABYTE);
+		tailConfig.setRelativeMemoryInput(1, (double)coGroupSortMemory/totalMemoryConsumption);
 		tailConfig.setFilehandlesInput(1, NUM_FILE_HANDLES_PER_SORT);
 		tailConfig.setSpillingThresholdInput(1, SORT_SPILL_THRESHOLD);
 		tailConfig.addIterationAggregator(CustomCompensatableDotProductCoGroup.AGGREGATOR_NAME, new PageRankStatsAggregator());
@@ -273,8 +273,7 @@ public class CustomCompensatableDanglingPageRankWithCombiner {
 		
 		// --------------- the output ---------------------
 
-		JobOutputVertex output = JobGraphUtils.createFileOutput(jobGraph, "FinalOutput", degreeOfParallelism,
-			numSubTasksPerInstance);
+		JobOutputVertex output = JobGraphUtils.createFileOutput(jobGraph, "FinalOutput", degreeOfParallelism);
 		TaskConfig outputConfig = new TaskConfig(output.getConfiguration());
 		outputConfig.addInputToGroup(0);
 		outputConfig.setInputSerializer(vertexWithRankAndDanglingSerializer, 0);
@@ -284,7 +283,7 @@ public class CustomCompensatableDanglingPageRankWithCombiner {
 		// --------------- the auxiliaries ---------------------
 		
 		JobOutputVertex fakeTailOutput = JobGraphUtils.createFakeOutput(jobGraph, "FakeTailOutput",
-			degreeOfParallelism, numSubTasksPerInstance);
+			degreeOfParallelism);
 
 		JobOutputVertex sync = JobGraphUtils.createSync(jobGraph, degreeOfParallelism);
 		TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java
index 944f13b..f870fe6 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java
@@ -72,7 +72,6 @@ public class CompensatableDanglingPageRank {
 	public static JobGraph getJobGraph(String[] args) throws Exception {
 
 		int degreeOfParallelism = 2;
-		int numSubTasksPerInstance = degreeOfParallelism;
 		String pageWithRankInputPath = ""; // "file://" + PlayConstants.PLAY_DIR + "test-inputs/danglingpagerank/pageWithRank";
 		String adjacencyListInputPath = ""; // "file://" + PlayConstants.PLAY_DIR +
 //			"test-inputs/danglingpagerank/adjacencylists";
@@ -91,29 +90,30 @@ public class CompensatableDanglingPageRank {
 
 		if (args.length >= 15) {
 			degreeOfParallelism = Integer.parseInt(args[0]);
-			numSubTasksPerInstance = Integer.parseInt(args[1]);
-			pageWithRankInputPath = args[2];
-			adjacencyListInputPath = args[3];
-			outputPath = args[4];
-//			confPath = args[5];
-			minorConsumer = Integer.parseInt(args[6]);
-			matchMemory = Integer.parseInt(args[7]);
-			coGroupSortMemory = Integer.parseInt(args[8]);
-			numIterations = Integer.parseInt(args[9]);
-			numVertices = Long.parseLong(args[10]);
-			numDanglingVertices = Long.parseLong(args[11]);
-			failingWorkers = args[12];
-			failingIteration = Integer.parseInt(args[13]);
-			messageLoss = Double.parseDouble(args[14]);
+			pageWithRankInputPath = args[1];
+			adjacencyListInputPath = args[2];
+			outputPath = args[3];
+//			confPath = args[4];
+			minorConsumer = Integer.parseInt(args[5]);
+			matchMemory = Integer.parseInt(args[6]);
+			coGroupSortMemory = Integer.parseInt(args[7]);
+			numIterations = Integer.parseInt(args[8]);
+			numVertices = Long.parseLong(args[9]);
+			numDanglingVertices = Long.parseLong(args[10]);
+			failingWorkers = args[11];
+			failingIteration = Integer.parseInt(args[12]);
+			messageLoss = Double.parseDouble(args[13]);
 		}
 
+		int totalMemoryConsumption = 3*minorConsumer + matchMemory + coGroupSortMemory;
+
 		JobGraph jobGraph = new JobGraph("CompensatableDanglingPageRank");
 		
 		// --------------- the inputs ---------------------
 
 		// page rank input
 		JobInputVertex pageWithRankInput = JobGraphUtils.createInput(new ImprovedDanglingPageRankInputFormat(),
-			pageWithRankInputPath, "DanglingPageWithRankInput", jobGraph, degreeOfParallelism, numSubTasksPerInstance);
+			pageWithRankInputPath, "DanglingPageWithRankInput", jobGraph, degreeOfParallelism);
 		TaskConfig pageWithRankInputConfig = new TaskConfig(pageWithRankInput.getConfiguration());
 		pageWithRankInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
 		pageWithRankInputConfig.setOutputComparator(fieldZeroComparator, 0);
@@ -122,7 +122,7 @@ public class CompensatableDanglingPageRank {
 
 		// edges as adjacency list
 		JobInputVertex adjacencyListInput = JobGraphUtils.createInput(new ImprovedAdjacencyListInputFormat(),
-			adjacencyListInputPath, "AdjancencyListInput", jobGraph, degreeOfParallelism, numSubTasksPerInstance);
+			adjacencyListInputPath, "AdjancencyListInput", jobGraph, degreeOfParallelism);
 		TaskConfig adjacencyListInputConfig = new TaskConfig(adjacencyListInput.getConfiguration());
 		adjacencyListInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
 		adjacencyListInputConfig.setOutputSerializer(recSerializer);
@@ -130,7 +130,7 @@ public class CompensatableDanglingPageRank {
 
 		// --------------- the head ---------------------
 		JobTaskVertex head = JobGraphUtils.createTask(IterationHeadPactTask.class, "IterationHead", jobGraph,
-			degreeOfParallelism, numSubTasksPerInstance);
+			degreeOfParallelism);
 		TaskConfig headConfig = new TaskConfig(head.getConfiguration());
 		headConfig.setIterationId(ITERATION_ID);
 		
@@ -140,12 +140,12 @@ public class CompensatableDanglingPageRank {
 		headConfig.setInputSerializer(recSerializer, 0);
 		headConfig.setInputComparator(fieldZeroComparator, 0);
 		headConfig.setInputLocalStrategy(0, LocalStrategy.SORT);
-		headConfig.setMemoryInput(0, minorConsumer * JobGraphUtils.MEGABYTE);
+		headConfig.setRelativeMemoryInput(0, (double)minorConsumer/totalMemoryConsumption);
 		headConfig.setFilehandlesInput(0, NUM_FILE_HANDLES_PER_SORT);
 		headConfig.setSpillingThresholdInput(0, SORT_SPILL_THRESHOLD);
 		
 		// back channel / iterations
-		headConfig.setBackChannelMemory(minorConsumer * JobGraphUtils.MEGABYTE);
+		headConfig.setRelativeBackChannelMemory((double)minorConsumer/totalMemoryConsumption);
 		
 		// output into iteration
 		headConfig.setOutputSerializer(recSerializer);
@@ -175,13 +175,13 @@ public class CompensatableDanglingPageRank {
 		// --------------- the join ---------------------
 		
 		JobTaskVertex intermediate = JobGraphUtils.createTask(IterationIntermediatePactTask.class,
-			"IterationIntermediate", jobGraph, degreeOfParallelism, numSubTasksPerInstance);
+			"IterationIntermediate", jobGraph, degreeOfParallelism);
 		TaskConfig intermediateConfig = new TaskConfig(intermediate.getConfiguration());
 		intermediateConfig.setIterationId(ITERATION_ID);
 //		intermediateConfig.setDriver(RepeatableHashjoinMatchDriverWithCachedBuildside.class);
 		intermediateConfig.setDriver(BuildSecondCachedMatchDriver.class);
 		intermediateConfig.setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
-		intermediateConfig.setMemoryDriver(matchMemory * JobGraphUtils.MEGABYTE);
+		intermediateConfig.setRelativeMemoryDriver((double)matchMemory/totalMemoryConsumption);
 		intermediateConfig.addInputToGroup(0);
 		intermediateConfig.addInputToGroup(1);
 		intermediateConfig.setInputSerializer(recSerializer, 0);
@@ -203,7 +203,7 @@ public class CompensatableDanglingPageRank {
 		// ---------------- the tail (co group) --------------------
 		
 		JobTaskVertex tail = JobGraphUtils.createTask(IterationTailPactTask.class, "IterationTail", jobGraph,
-			degreeOfParallelism, numSubTasksPerInstance);
+			degreeOfParallelism);
 		TaskConfig tailConfig = new TaskConfig(tail.getConfiguration());
 		tailConfig.setIterationId(ITERATION_ID);
         tailConfig.setIsWorksetUpdate();
@@ -220,10 +220,10 @@ public class CompensatableDanglingPageRank {
 		tailConfig.setDriverComparator(fieldZeroComparator, 1);
 		tailConfig.setDriverPairComparator(pairComparatorFactory);
 		tailConfig.setInputAsynchronouslyMaterialized(0, true);
-		tailConfig.setInputMaterializationMemory(0, minorConsumer * JobGraphUtils.MEGABYTE);
+		tailConfig.setRelativeInputMaterializationMemory(0, (double)minorConsumer/totalMemoryConsumption);
 		tailConfig.setInputLocalStrategy(1, LocalStrategy.SORT);
 		tailConfig.setInputComparator(fieldZeroComparator, 1);
-		tailConfig.setMemoryInput(1, coGroupSortMemory * JobGraphUtils.MEGABYTE);
+		tailConfig.setRelativeMemoryInput(1, (double)coGroupSortMemory/totalMemoryConsumption);
 		tailConfig.setFilehandlesInput(1, NUM_FILE_HANDLES_PER_SORT);
 		tailConfig.setSpillingThresholdInput(1, SORT_SPILL_THRESHOLD);
 		
@@ -241,8 +241,7 @@ public class CompensatableDanglingPageRank {
 		
 		// --------------- the output ---------------------
 
-		JobOutputVertex output = JobGraphUtils.createFileOutput(jobGraph, "FinalOutput", degreeOfParallelism,
-			numSubTasksPerInstance);
+		JobOutputVertex output = JobGraphUtils.createFileOutput(jobGraph, "FinalOutput", degreeOfParallelism);
 		TaskConfig outputConfig = new TaskConfig(output.getConfiguration());
 		outputConfig.addInputToGroup(0);
 		outputConfig.setInputSerializer(recSerializer, 0);
@@ -252,7 +251,7 @@ public class CompensatableDanglingPageRank {
 		// --------------- the auxiliaries ---------------------
 		
 		JobOutputVertex fakeTailOutput = JobGraphUtils.createFakeOutput(jobGraph, "FakeTailOutput",
-			degreeOfParallelism, numSubTasksPerInstance);
+			degreeOfParallelism);
 
 		JobOutputVertex sync = JobGraphUtils.createSync(jobGraph, degreeOfParallelism);
 		TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/localDistributed/PackagedProgramEndToEndITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/localDistributed/PackagedProgramEndToEndITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/localDistributed/PackagedProgramEndToEndITCase.java
index 3626ba7..0e297ec 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/localDistributed/PackagedProgramEndToEndITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/localDistributed/PackagedProgramEndToEndITCase.java
@@ -14,6 +14,7 @@ package eu.stratosphere.test.localDistributed;
 
 import java.io.File;
 import java.io.FileWriter;
+import java.net.URL;
 
 import eu.stratosphere.client.minicluster.NepheleMiniCluster;
 import org.junit.Assert;
@@ -28,6 +29,8 @@ import eu.stratosphere.util.LogUtils;
 
 public class PackagedProgramEndToEndITCase {
 
+	private static final int DOP = 4;
+
 	static {
 		LogUtils.initializeDefaultTestConsoleLogger();
 	}
@@ -53,16 +56,18 @@ public class PackagedProgramEndToEndITCase {
 			fwClusters.write(KMeansData.INITIAL_CENTERS);
 			fwClusters.close();
 
-			String jarPath = "target/maven-test-jar.jar";
+			URL jarFileURL = getClass().getResource("/KMeansForTest.jar");
+			String jarPath = jarFileURL.getFile();
 
 			// run KMeans
-			cluster.setNumTaskManager(2);
+			cluster.setNumTaskTracker(2);
+			cluster.setTaskManagerNumSlots(2);
 			cluster.start();
 			RemoteExecutor ex = new RemoteExecutor("localhost", 6498);
-			
+
 			ex.executeJar(jarPath,
-					"eu.stratosphere.test.util.testjar.KMeansForTest",
-					new String[] {
+					"eu.stratosphere.examples.scala.testing.KMeansForTest",
+					new String[] {new Integer(DOP).toString(),
 							points.toURI().toString(),
 							clusters.toURI().toString(),
 							outFile.toURI().toString(),

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/operators/UnionSinkITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/operators/UnionSinkITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/operators/UnionSinkITCase.java
index b9f0f2c..2637acb 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/operators/UnionSinkITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/operators/UnionSinkITCase.java
@@ -44,6 +44,7 @@ public class UnionSinkITCase extends RecordAPITestBase {
 	
 	public UnionSinkITCase(Configuration testConfig) {
 		super(testConfig);
+		setTaskManagerNumSlots(DOP);
 	}
 
 	private static final String MAP_IN = "1 1\n2 2\n2 8\n4 4\n4 4\n6 6\n7 7\n8 8\n" +
@@ -115,7 +116,7 @@ public class UnionSinkITCase extends RecordAPITestBase {
 		output.addInput(testMapper2);
 		
 		Plan plan = new Plan(output);
-		plan.setDefaultParallelism(4);
+		plan.setDefaultParallelism(DOP);
 
 		PactCompiler pc = new PactCompiler(new DataStatistics());
 		OptimizedPlan op = pc.compile(plan);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/CollectionSourceTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/CollectionSourceTest.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/CollectionSourceTest.java
index 48f703a..1f30075 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/CollectionSourceTest.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/CollectionSourceTest.java
@@ -36,8 +36,14 @@ import eu.stratosphere.util.Collector;
  */
 public class CollectionSourceTest extends RecordAPITestBase {
 
+	private static final int DOP = 4;
+
 	protected String resultPath;
 
+	public CollectionSourceTest(){
+		setTaskManagerNumSlots(DOP);
+	}
+
 	public static class Join extends JoinFunction {
 
 		private static final long serialVersionUID = 1L;
@@ -110,7 +116,7 @@ public class CollectionSourceTest extends RecordAPITestBase {
 
 	@Override
 	protected Plan getTestJob() {
-		return getPlan(4, resultPath);
+		return getPlan(DOP, resultPath);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/ComputeEdgeDegreesITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/ComputeEdgeDegreesITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/ComputeEdgeDegreesITCase.java
index f52e108..7a86112 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/ComputeEdgeDegreesITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/ComputeEdgeDegreesITCase.java
@@ -35,6 +35,7 @@ public class ComputeEdgeDegreesITCase extends RecordAPITestBase {
 	
 	public ComputeEdgeDegreesITCase(Configuration config) {
 		super(config);
+		setTaskManagerNumSlots(DOP);
 	}
 
 	@Override
@@ -58,7 +59,7 @@ public class ComputeEdgeDegreesITCase extends RecordAPITestBase {
 	@Parameters
 	public static Collection<Object[]> getConfigurations() {
 		Configuration config = new Configuration();
-		config.setInteger("ComputeEdgeDegreesTest#NumSubtasks", 4);
+		config.setInteger("ComputeEdgeDegreesTest#NumSubtasks", DOP);
 		return toParameterList(config);
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/EnumTrianglesOnEdgesWithDegreesITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/EnumTrianglesOnEdgesWithDegreesITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/EnumTrianglesOnEdgesWithDegreesITCase.java
index b4b0386..68d66a6 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/EnumTrianglesOnEdgesWithDegreesITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/EnumTrianglesOnEdgesWithDegreesITCase.java
@@ -35,6 +35,7 @@ public class EnumTrianglesOnEdgesWithDegreesITCase extends RecordAPITestBase {
 	
 	public EnumTrianglesOnEdgesWithDegreesITCase(Configuration config) {
 		super(config);
+		setTaskManagerNumSlots(DOP);
 	}
 
 	@Override
@@ -60,7 +61,7 @@ public class EnumTrianglesOnEdgesWithDegreesITCase extends RecordAPITestBase {
 	@Parameters
 	public static Collection<Object[]> getConfigurations() {
 		Configuration config = new Configuration();
-		config.setInteger("EnumTrianglesTest#NumSubtasks", 4);
+		config.setInteger("EnumTrianglesTest#NumSubtasks", DOP);
 		return toParameterList(config);
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/EnumTrianglesRDFITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/EnumTrianglesRDFITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/EnumTrianglesRDFITCase.java
index 945cc67..93e9bfe 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/EnumTrianglesRDFITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/EnumTrianglesRDFITCase.java
@@ -54,7 +54,7 @@ public class EnumTrianglesRDFITCase extends RecordAPITestBase {
 	protected Plan getTestJob() {
 		EnumTrianglesRdfFoaf enumTriangles = new EnumTrianglesRdfFoaf();
 		return enumTriangles.getPlan(
-				config.getString("EnumTrianglesTest#NoSubtasks", "4"), edgesPath, resultPath);
+				config.getString("EnumTrianglesTest#NoSubtasks", new Integer(DOP).toString()), edgesPath, resultPath);
 	}
 
 	@Override
@@ -65,7 +65,7 @@ public class EnumTrianglesRDFITCase extends RecordAPITestBase {
 	@Parameters
 	public static Collection<Object[]> getConfigurations() {
 		Configuration config = new Configuration();
-		config.setInteger("EnumTrianglesTest#NoSubtasks", 4);
+		config.setInteger("EnumTrianglesTest#NoSubtasks", DOP);
 		return toParameterList(config);
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/GlobalSortingITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/GlobalSortingITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/GlobalSortingITCase.java
index 7aa9a78..f628ca5 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/GlobalSortingITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/GlobalSortingITCase.java
@@ -38,6 +38,9 @@ public class GlobalSortingITCase extends RecordAPITestBase {
 
 	private String sortedRecords;
 
+	public GlobalSortingITCase(){
+		setTaskManagerNumSlots(DOP);
+	}
 
 	@Override
 	protected void preSubmit() throws Exception {
@@ -77,7 +80,7 @@ public class GlobalSortingITCase extends RecordAPITestBase {
 	@Override
 	protected Plan getTestJob() {
 		GlobalSort globalSort = new GlobalSort();
-		return globalSort.getPlan("4", recordsPath, resultPath);
+		return globalSort.getPlan(new Integer(DOP).toString(), recordsPath, resultPath);
 	}
 
 	@Override