You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/06/22 23:47:23 UTC
[02/22] Rework the Taskmanager to a slot based model and remove
legacy cloud code
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleScalaPrograms/TransitiveClosureNaiveITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleScalaPrograms/TransitiveClosureNaiveITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleScalaPrograms/TransitiveClosureNaiveITCase.java
index feb5f34..95d75fb 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleScalaPrograms/TransitiveClosureNaiveITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleScalaPrograms/TransitiveClosureNaiveITCase.java
@@ -39,7 +39,7 @@ public class TransitiveClosureNaiveITCase extends RecordAPITestBase {
protected Plan getTestJob() {
TransitiveClosureNaive transitiveClosureNaive = new TransitiveClosureNaive();
// "2" is the number of iterations here
- return transitiveClosureNaive.getScalaPlan(4, 2, verticesPath, edgesPath, resultPath);
+ return transitiveClosureNaive.getScalaPlan(DOP, 2, verticesPath, edgesPath, resultPath);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleScalaPrograms/WebLogAnalysisITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleScalaPrograms/WebLogAnalysisITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleScalaPrograms/WebLogAnalysisITCase.java
index 2ddef0c..67c5ce1 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleScalaPrograms/WebLogAnalysisITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleScalaPrograms/WebLogAnalysisITCase.java
@@ -21,6 +21,6 @@ public class WebLogAnalysisITCase extends eu.stratosphere.test.recordJobTests.We
@Override
protected Plan getTestJob() {
WebLogAnalysis webLogAnalysis = new WebLogAnalysis();
- return webLogAnalysis.getScalaPlan(4, docsPath, ranksPath, visitsPath, resultPath);
+ return webLogAnalysis.getScalaPlan(DOP, docsPath, ranksPath, visitsPath, resultPath);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleScalaPrograms/WordCountITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleScalaPrograms/WordCountITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleScalaPrograms/WordCountITCase.java
index 205828f..42ee31a 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleScalaPrograms/WordCountITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleScalaPrograms/WordCountITCase.java
@@ -22,6 +22,6 @@ public class WordCountITCase extends eu.stratosphere.test.recordJobTests.WordCou
@Override
protected Plan getTestJob() {
WordCount wc = new WordCount();
- return wc.getScalaPlan(4, textPath, resultPath);
+ return wc.getScalaPlan(DOP, textPath, resultPath);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleScalaPrograms/WordCountPactValueITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleScalaPrograms/WordCountPactValueITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleScalaPrograms/WordCountPactValueITCase.java
index 4627d48..44b37f6 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleScalaPrograms/WordCountPactValueITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleScalaPrograms/WordCountPactValueITCase.java
@@ -22,6 +22,6 @@ public class WordCountPactValueITCase extends eu.stratosphere.test.recordJobTest
@Override
protected Plan getTestJob() {
WordCountWithUserDefinedType wc = new WordCountWithUserDefinedType();
- return wc.getScalaPlan(4, textPath, resultPath);
+ return wc.getScalaPlan(DOP, textPath, resultPath);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleScalaPrograms/WordCountWithCountFunctionITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleScalaPrograms/WordCountWithCountFunctionITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleScalaPrograms/WordCountWithCountFunctionITCase.java
index 12082ac..a8eba29 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleScalaPrograms/WordCountWithCountFunctionITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleScalaPrograms/WordCountWithCountFunctionITCase.java
@@ -20,6 +20,6 @@ public class WordCountWithCountFunctionITCase extends eu.stratosphere.test.recor
@Override
protected Plan getTestJob() {
- return new WordCountWithCount().getScalaPlan(4, textPath, resultPath);
+ return new WordCountWithCount().getScalaPlan(DOP, textPath, resultPath);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/failingPrograms/TaskFailureITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/failingPrograms/TaskFailureITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/failingPrograms/TaskFailureITCase.java
index 3f4791e..4f5d38d 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/failingPrograms/TaskFailureITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/failingPrograms/TaskFailureITCase.java
@@ -36,6 +36,8 @@ import eu.stratosphere.util.Collector;
*/
public class TaskFailureITCase extends FailingTestBase {
+ private static final int DOP = 4;
+
// input for map tasks
private static final String MAP_IN = "1 1\n2 2\n2 8\n4 4\n4 4\n6 6\n7 7\n8 8\n" +
"1 1\n2 2\n2 2\n4 4\n4 4\n6 3\n5 9\n8 8\n" +
@@ -47,6 +49,10 @@ public class TaskFailureITCase extends FailingTestBase {
private String inputPath;
private String resultPath;
+
+ public TaskFailureITCase(){
+ setTaskManagerNumSlots(DOP);
+ }
@Override
protected void preSubmit() throws Exception {
@@ -73,7 +79,7 @@ public class TaskFailureITCase extends FailingTestBase {
// generate plan
Plan plan = new Plan(output);
- plan.setDefaultParallelism(4);
+ plan.setDefaultParallelism(DOP);
// optimize and compile plan
PactCompiler pc = new PactCompiler(new DataStatistics());
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/CoGroupConnectedComponentsITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/CoGroupConnectedComponentsITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/CoGroupConnectedComponentsITCase.java
index c937435..f8d82ae 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/CoGroupConnectedComponentsITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/CoGroupConnectedComponentsITCase.java
@@ -50,6 +50,10 @@ public class CoGroupConnectedComponentsITCase extends RecordAPITestBase {
protected String verticesPath;
protected String edgesPath;
protected String resultPath;
+
+ public CoGroupConnectedComponentsITCase(){
+ setTaskManagerNumSlots(DOP);
+ }
@Override
@@ -61,7 +65,7 @@ public class CoGroupConnectedComponentsITCase extends RecordAPITestBase {
@Override
protected Plan getTestJob() {
- return getPlan(4, verticesPath, edgesPath, resultPath, 100);
+ return getPlan(DOP, verticesPath, edgesPath, resultPath, 100);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/ConnectedComponentsITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/ConnectedComponentsITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/ConnectedComponentsITCase.java
index 9c88bb5..53feae6 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/ConnectedComponentsITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/ConnectedComponentsITCase.java
@@ -33,6 +33,10 @@ public class ConnectedComponentsITCase extends RecordAPITestBase {
protected String verticesPath;
protected String edgesPath;
protected String resultPath;
+
+ public ConnectedComponentsITCase(){
+ setTaskManagerNumSlots(DOP);
+ }
@Override
@@ -45,7 +49,7 @@ public class ConnectedComponentsITCase extends RecordAPITestBase {
@Override
protected Plan getTestJob() {
WorksetConnectedComponents cc = new WorksetConnectedComponents();
- return cc.getPlan("4", verticesPath, edgesPath, resultPath, "100");
+ return cc.getPlan(new Integer(DOP).toString(), verticesPath, edgesPath, resultPath, "100");
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/ConnectedComponentsWithDeferredUpdateITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/ConnectedComponentsWithDeferredUpdateITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/ConnectedComponentsWithDeferredUpdateITCase.java
index 8de877c..eb2eec0 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/ConnectedComponentsWithDeferredUpdateITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/ConnectedComponentsWithDeferredUpdateITCase.java
@@ -59,6 +59,7 @@ public class ConnectedComponentsWithDeferredUpdateITCase extends RecordAPITestBa
public ConnectedComponentsWithDeferredUpdateITCase(Configuration config) {
super(config);
+ setTaskManagerNumSlots(DOP);
}
@Override
@@ -71,7 +72,7 @@ public class ConnectedComponentsWithDeferredUpdateITCase extends RecordAPITestBa
@Override
protected Plan getTestJob() {
boolean extraMapper = config.getBoolean("ExtraMapper", false);
- return getPlan(4, verticesPath, edgesPath, resultPath, 100, extraMapper);
+ return getPlan(DOP, verticesPath, edgesPath, resultPath, 100, extraMapper);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/ConnectedComponentsWithSolutionSetFirstITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/ConnectedComponentsWithSolutionSetFirstITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/ConnectedComponentsWithSolutionSetFirstITCase.java
index 5c696a2..f040f06 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/ConnectedComponentsWithSolutionSetFirstITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/ConnectedComponentsWithSolutionSetFirstITCase.java
@@ -51,6 +51,10 @@ public class ConnectedComponentsWithSolutionSetFirstITCase extends RecordAPITest
protected String verticesPath;
protected String edgesPath;
protected String resultPath;
+
+ public ConnectedComponentsWithSolutionSetFirstITCase(){
+ setTaskManagerNumSlots(DOP);
+ }
@Override
@@ -62,7 +66,8 @@ public class ConnectedComponentsWithSolutionSetFirstITCase extends RecordAPITest
@Override
protected Plan getTestJob() {
- return getPlanForWorksetConnectedComponentsWithSolutionSetAsFirstInput(4, verticesPath, edgesPath, resultPath, 100);
+ return getPlanForWorksetConnectedComponentsWithSolutionSetAsFirstInput(DOP, verticesPath, edgesPath,
+ resultPath, 100);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/DanglingPageRankITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/DanglingPageRankITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/DanglingPageRankITCase.java
index e84f21e..5390eed 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/DanglingPageRankITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/DanglingPageRankITCase.java
@@ -35,6 +35,7 @@ public class DanglingPageRankITCase extends RecordAPITestBase {
public DanglingPageRankITCase(Configuration config) {
super(config);
+ setTaskManagerNumSlots(DOP);
}
@@ -63,7 +64,7 @@ public class DanglingPageRankITCase extends RecordAPITestBase {
@Parameters
public static Collection<Object[]> getConfigurations() {
Configuration config1 = new Configuration();
- config1.setInteger("PageRankITCase#NoSubtasks", 4);
+ config1.setInteger("PageRankITCase#NoSubtasks", DOP);
config1.setString("PageRankITCase#NumIterations", "25");
return toParameterList(config1);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/DeltaPageRankITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/DeltaPageRankITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/DeltaPageRankITCase.java
index 8f06929..1eb81ed 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/DeltaPageRankITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/DeltaPageRankITCase.java
@@ -36,6 +36,7 @@ public class DeltaPageRankITCase extends RecordAPITestBase {
public DeltaPageRankITCase(Configuration config) {
super(config);
+ setTaskManagerNumSlots(DOP);
}
@Override
@@ -66,7 +67,7 @@ public class DeltaPageRankITCase extends RecordAPITestBase {
@Parameters
public static Collection<Object[]> getConfigurations() {
Configuration config1 = new Configuration();
- config1.setInteger("NumSubtasks", 4);
+ config1.setInteger("NumSubtasks", DOP);
config1.setInteger("NumIterations", 3);
return toParameterList(config1);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/DependencyConnectedComponentsITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/DependencyConnectedComponentsITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/DependencyConnectedComponentsITCase.java
index deda551..e1339e3 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/DependencyConnectedComponentsITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/DependencyConnectedComponentsITCase.java
@@ -28,7 +28,6 @@ import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.test.util.JavaProgramTestBase;
import eu.stratosphere.util.Collector;
-
/**
*
* Iterative Connected Components test case which recomputes only the elements
@@ -46,6 +45,10 @@ public class DependencyConnectedComponentsITCase extends JavaProgramTestBase {
protected static List<Tuple2<Long, Long>> edgesInput = new ArrayList<Tuple2<Long, Long>>();
private String resultPath;
private String expectedResult;
+
+ public DependencyConnectedComponentsITCase(){
+ setTaskManagerNumSlots(DOP);
+ }
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/IterationTerminationWithTerminationTail.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/IterationTerminationWithTerminationTail.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/IterationTerminationWithTerminationTail.java
index 3c38263..50c1970 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/IterationTerminationWithTerminationTail.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/IterationTerminationWithTerminationTail.java
@@ -40,6 +40,10 @@ public class IterationTerminationWithTerminationTail extends RecordAPITestBase {
protected String dataPath;
protected String resultPath;
+
+ public IterationTerminationWithTerminationTail(){
+ setTaskManagerNumSlots(DOP);
+ }
@Override
protected void preSubmit() throws Exception {
@@ -54,7 +58,7 @@ public class IterationTerminationWithTerminationTail extends RecordAPITestBase {
@Override
protected Plan getTestJob() {
- return getTestPlanPlan(4, dataPath, resultPath);
+ return getTestPlanPlan(DOP, dataPath, resultPath);
}
private static Plan getTestPlanPlan(int numSubTasks, String input, String output) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/IterationTerminationWithTwoTails.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/IterationTerminationWithTwoTails.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/IterationTerminationWithTwoTails.java
index f3c6cbb..cfe0510 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/IterationTerminationWithTwoTails.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/IterationTerminationWithTwoTails.java
@@ -42,6 +42,10 @@ public class IterationTerminationWithTwoTails extends RecordAPITestBase {
protected String dataPath;
protected String resultPath;
+ public IterationTerminationWithTwoTails(){
+ setTaskManagerNumSlots(DOP);
+ }
+
@Override
protected void preSubmit() throws Exception {
dataPath = createTempFile("datapoints.txt", INPUT);
@@ -55,7 +59,7 @@ public class IterationTerminationWithTwoTails extends RecordAPITestBase {
@Override
protected Plan getTestJob() {
- return getTestPlanPlan(4, dataPath, resultPath);
+ return getTestPlanPlan(DOP, dataPath, resultPath);
}
private static Plan getTestPlanPlan(int numSubTasks, String input, String output) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/IterationWithAllReducerITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/IterationWithAllReducerITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/IterationWithAllReducerITCase.java
index c7e28e2..7085776 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/IterationWithAllReducerITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/IterationWithAllReducerITCase.java
@@ -40,7 +40,9 @@ public class IterationWithAllReducerITCase extends RecordAPITestBase {
protected String dataPath;
protected String resultPath;
-
+ public IterationWithAllReducerITCase(){
+ setTaskManagerNumSlots(4);
+ }
@Override
protected void preSubmit() throws Exception {
@@ -55,7 +57,7 @@ public class IterationWithAllReducerITCase extends RecordAPITestBase {
@Override
protected Plan getTestJob() {
- Plan plan = getTestPlanPlan(4, dataPath, resultPath);
+ Plan plan = getTestPlanPlan(DOP, dataPath, resultPath);
return plan;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/IterationWithChainingITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/IterationWithChainingITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/IterationWithChainingITCase.java
index 55c7a35..dba561d 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/IterationWithChainingITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/IterationWithChainingITCase.java
@@ -46,6 +46,7 @@ public class IterationWithChainingITCase extends RecordAPITestBase {
public IterationWithChainingITCase(Configuration config) {
super(config);
+ setTaskManagerNumSlots(DOP);
}
@Override
@@ -69,7 +70,7 @@ public class IterationWithChainingITCase extends RecordAPITestBase {
@Parameters
public static Collection<Object[]> getConfigurations() {
Configuration config1 = new Configuration();
- config1.setInteger("ChainedMapperITCase#NoSubtasks", 4);
+ config1.setInteger("ChainedMapperITCase#NoSubtasks", DOP);
return toParameterList(config1);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/IterationWithUnionITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/IterationWithUnionITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/IterationWithUnionITCase.java
index 1272f6e..884dd64 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/IterationWithUnionITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/IterationWithUnionITCase.java
@@ -46,6 +46,7 @@ public class IterationWithUnionITCase extends RecordAPITestBase {
public IterationWithUnionITCase(Configuration config) {
super(config);
+ setTaskManagerNumSlots(DOP);
}
@Override
@@ -67,7 +68,7 @@ public class IterationWithUnionITCase extends RecordAPITestBase {
@Parameters
public static Collection<Object[]> getConfigurations() {
Configuration config1 = new Configuration();
- config1.setInteger("IterationWithUnionITCase#NumSubtasks", 4);
+ config1.setInteger("IterationWithUnionITCase#NumSubtasks", DOP);
return toParameterList(config1);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/IterativeKMeansITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/IterativeKMeansITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/IterativeKMeansITCase.java
index c8da45f..199f108 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/IterativeKMeansITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/IterativeKMeansITCase.java
@@ -27,6 +27,10 @@ public class IterativeKMeansITCase extends RecordAPITestBase {
protected String dataPath;
protected String clusterPath;
protected String resultPath;
+
+ public IterativeKMeansITCase(){
+ setTaskManagerNumSlots(DOP);
+ }
@Override
protected void preSubmit() throws Exception {
@@ -38,7 +42,7 @@ public class IterativeKMeansITCase extends RecordAPITestBase {
@Override
protected Plan getTestJob() {
KMeansBroadcast kmi = new KMeansBroadcast();
- return kmi.getPlan("4", dataPath, clusterPath, resultPath, "20");
+ return kmi.getPlan(String.valueOf(DOP), dataPath, clusterPath, resultPath, "20");
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/KMeansITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/KMeansITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/KMeansITCase.java
index 7085c3c..d67da35 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/KMeansITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/KMeansITCase.java
@@ -27,7 +27,11 @@ public class KMeansITCase extends RecordAPITestBase {
protected String dataPath;
protected String clusterPath;
protected String resultPath;
-
+
+ public KMeansITCase(){
+ setTaskManagerNumSlots(DOP);
+ }
+
@Override
protected void preSubmit() throws Exception {
dataPath = createTempFile("datapoints.txt", KMeansData.DATAPOINTS);
@@ -38,7 +42,7 @@ public class KMeansITCase extends RecordAPITestBase {
@Override
protected Plan getTestJob() {
KMeansBroadcast kmi = new KMeansBroadcast();
- return kmi.getPlan("4", dataPath, clusterPath, resultPath, "20");
+ return kmi.getPlan(String.valueOf(DOP), dataPath, clusterPath, resultPath, "20");
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/LineRankITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/LineRankITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/LineRankITCase.java
index f38fb7e..7c68605 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/LineRankITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/LineRankITCase.java
@@ -54,6 +54,7 @@ public class LineRankITCase extends RecordAPITestBase {
public LineRankITCase(Configuration config) {
super(config);
+ setTaskManagerNumSlots(DOP);
}
@Override
@@ -68,7 +69,7 @@ public class LineRankITCase extends RecordAPITestBase {
LineRank lr = new LineRank();
Plan plan = lr.getScalaPlan(
- config.getInteger("NumSubtasks", 1),
+ config.getInteger("NumSubtasks", 1),
sourcesPath,
targetsPath,
9,
@@ -79,7 +80,7 @@ public class LineRankITCase extends RecordAPITestBase {
@Parameters
public static Collection<Object[]> getConfigurations() {
Configuration config1 = new Configuration();
- config1.setInteger("NumSubtasks", 4);
+ config1.setInteger("NumSubtasks", DOP);
config1.setInteger("NumIterations", 5);
return toParameterList(config1);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/PageRankITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/PageRankITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/PageRankITCase.java
index 8b12b53..58d8170 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/PageRankITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/PageRankITCase.java
@@ -38,6 +38,7 @@ public class PageRankITCase extends RecordAPITestBase {
public PageRankITCase(Configuration config) {
super(config);
+ setTaskManagerNumSlots(DOP);
}
@Override
@@ -64,7 +65,7 @@ public class PageRankITCase extends RecordAPITestBase {
@Parameters
public static Collection<Object[]> getConfigurations() {
Configuration config1 = new Configuration();
- config1.setInteger("NumSubtasks", 4);
+ config1.setInteger("NumSubtasks", DOP);
config1.setString("NumIterations", "5");
return toParameterList(config1);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/ConnectedComponentsNepheleITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
index 7eff1aa..b2b3df2 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
@@ -82,6 +82,10 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
private static final long MEM_PER_CONSUMER = 3;
+ private static final int DOP = 4;
+
+ private static final double MEM_FRAC_PER_CONSUMER = (double)MEM_PER_CONSUMER/TASK_MANAGER_MEMORY_SIZE*DOP;
+
protected String verticesPath;
protected String edgesPath;
@@ -90,6 +94,7 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
public ConnectedComponentsNepheleITCase(Configuration config) {
super(config);
+ setTaskManagerNumSlots(DOP);
}
@Parameters
@@ -118,20 +123,19 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
@Override
protected JobGraph getJobGraph() throws Exception {
- int dop = 4;
int maxIterations = 100;
int type = config.getInteger("testcase", 0);
switch (type) {
case 1:
- return createJobGraphUnifiedTails(verticesPath, edgesPath, resultPath, dop, maxIterations);
+ return createJobGraphUnifiedTails(verticesPath, edgesPath, resultPath, DOP, maxIterations);
case 2:
- return createJobGraphSeparateTails(verticesPath, edgesPath, resultPath, dop, maxIterations);
+ return createJobGraphSeparateTails(verticesPath, edgesPath, resultPath, DOP, maxIterations);
case 3:
- return createJobGraphIntermediateWorksetUpdateAndSolutionSetTail(verticesPath, edgesPath, resultPath, dop,
+ return createJobGraphIntermediateWorksetUpdateAndSolutionSetTail(verticesPath, edgesPath, resultPath, DOP,
maxIterations);
case 4:
- return createJobGraphSolutionSetUpdateAndWorksetTail(verticesPath, edgesPath, resultPath, dop,
+ return createJobGraphSolutionSetUpdateAndWorksetTail(verticesPath, edgesPath, resultPath, DOP,
maxIterations);
default:
throw new RuntimeException("Broken test configuration");
@@ -167,7 +171,7 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
@SuppressWarnings("unchecked")
CsvInputFormat verticesInFormat = new CsvInputFormat(' ', LongValue.class);
JobInputVertex verticesInput = JobGraphUtils.createInput(verticesInFormat, verticesPath, "VerticesInput",
- jobGraph, numSubTasks, numSubTasks);
+ jobGraph, numSubTasks);
TaskConfig verticesInputConfig = new TaskConfig(verticesInput.getConfiguration());
{
verticesInputConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
@@ -199,7 +203,7 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
@SuppressWarnings("unchecked")
CsvInputFormat edgesInFormat = new CsvInputFormat(' ', LongValue.class, LongValue.class);
JobInputVertex edgesInput = JobGraphUtils.createInput(edgesInFormat, edgesPath, "EdgesInput", jobGraph,
- numSubTasks, numSubTasks);
+ numSubTasks);
TaskConfig edgesInputConfig = new TaskConfig(edgesInput.getConfiguration());
{
edgesInputConfig.setOutputSerializer(serializer);
@@ -216,7 +220,7 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
TypePairComparatorFactory<?, ?> pairComparator) {
JobTaskVertex head = JobGraphUtils.createTask(IterationHeadPactTask.class, "Join With Edges (Iteration Head)",
- jobGraph, numSubTasks, numSubTasks);
+ jobGraph, numSubTasks);
TaskConfig headConfig = new TaskConfig(head.getConfiguration());
{
headConfig.setIterationId(ITERATION_ID);
@@ -234,7 +238,7 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
headConfig.setInputComparator(comparator, 1);
headConfig.setInputLocalStrategy(1, LocalStrategy.NONE);
headConfig.setInputCached(1, true);
- headConfig.setInputMaterializationMemory(1, MEM_PER_CONSUMER * JobGraphUtils.MEGABYTE);
+ headConfig.setRelativeInputMaterializationMemory(1, MEM_FRAC_PER_CONSUMER);
// initial solution set input
headConfig.addInputToGroup(2);
@@ -248,8 +252,8 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
// back channel / iterations
headConfig.setIsWorksetIteration();
- headConfig.setBackChannelMemory(MEM_PER_CONSUMER * JobGraphUtils.MEGABYTE);
- headConfig.setSolutionSetMemory(MEM_PER_CONSUMER * JobGraphUtils.MEGABYTE);
+ headConfig.setRelativeBackChannelMemory(MEM_FRAC_PER_CONSUMER);
+ headConfig.setRelativeSolutionSetMemory(MEM_FRAC_PER_CONSUMER );
// output into iteration
headConfig.setOutputSerializer(serializer);
@@ -273,7 +277,7 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
headConfig.setDriverComparator(comparator, 0);
headConfig.setDriverComparator(comparator, 1);
headConfig.setDriverPairComparator(pairComparator);
- headConfig.setMemoryDriver(MEM_PER_CONSUMER * JobGraphUtils.MEGABYTE);
+ headConfig.setRelativeMemoryDriver(MEM_FRAC_PER_CONSUMER);
headConfig.addIterationAggregator(
WorksetEmptyConvergenceCriterion.AGGREGATOR_NAME, new LongSumAggregator());
@@ -288,7 +292,7 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
// --------------- the intermediate (reduce to min id) ---------------
JobTaskVertex intermediate = JobGraphUtils.createTask(IterationIntermediatePactTask.class,
- "Find Min Component-ID", jobGraph, numSubTasks, numSubTasks);
+ "Find Min Component-ID", jobGraph, numSubTasks);
TaskConfig intermediateConfig = new TaskConfig(intermediate.getConfiguration());
{
intermediateConfig.setIterationId(ITERATION_ID);
@@ -297,7 +301,7 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
intermediateConfig.setInputSerializer(serializer, 0);
intermediateConfig.setInputComparator(comparator, 0);
intermediateConfig.setInputLocalStrategy(0, LocalStrategy.SORT);
- intermediateConfig.setMemoryInput(0, MEM_PER_CONSUMER * JobGraphUtils.MEGABYTE);
+ intermediateConfig.setRelativeMemoryInput(0, MEM_FRAC_PER_CONSUMER);
intermediateConfig.setFilehandlesInput(0, 64);
intermediateConfig.setSpillingThresholdInput(0, 0.85f);
@@ -316,7 +320,7 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
private static JobOutputVertex createOutput(JobGraph jobGraph, String resultPath, int numSubTasks,
TypeSerializerFactory<?> serializer) {
- JobOutputVertex output = JobGraphUtils.createFileOutput(jobGraph, "Final Output", numSubTasks, numSubTasks);
+ JobOutputVertex output = JobGraphUtils.createFileOutput(jobGraph, "Final Output", numSubTasks);
TaskConfig outputConfig = new TaskConfig(output.getConfiguration());
{
@@ -341,7 +345,7 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
private static JobOutputVertex createFakeTail(JobGraph jobGraph, int numSubTasks) {
JobOutputVertex fakeTailOutput =
- JobGraphUtils.createFakeOutput(jobGraph, "FakeTailOutput", numSubTasks, numSubTasks);
+ JobGraphUtils.createFakeOutput(jobGraph, "FakeTailOutput", numSubTasks);
return fakeTailOutput;
}
@@ -389,7 +393,7 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
// --------------- the tail (solution set join) ---------------
JobTaskVertex tail = JobGraphUtils.createTask(IterationTailPactTask.class, "IterationTail", jobGraph,
- numSubTasks, numSubTasks);
+ numSubTasks);
TaskConfig tailConfig = new TaskConfig(tail.getConfiguration());
{
tailConfig.setIterationId(ITERATION_ID);
@@ -480,7 +484,7 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
// ------------------ the intermediate (ss join) ----------------------
JobTaskVertex ssJoinIntermediate = JobGraphUtils.createTask(IterationIntermediatePactTask.class,
- "Solution Set Join", jobGraph, numSubTasks, numSubTasks);
+ "Solution Set Join", jobGraph, numSubTasks);
TaskConfig ssJoinIntermediateConfig = new TaskConfig(ssJoinIntermediate.getConfiguration());
{
ssJoinIntermediateConfig.setIterationId(ITERATION_ID);
@@ -509,7 +513,7 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
// -------------------------- ss tail --------------------------------
JobTaskVertex ssTail = JobGraphUtils.createTask(IterationTailPactTask.class, "IterationSolutionSetTail",
- jobGraph, numSubTasks, numSubTasks);
+ jobGraph, numSubTasks);
TaskConfig ssTailConfig = new TaskConfig(ssTail.getConfiguration());
{
ssTailConfig.setIterationId(ITERATION_ID);
@@ -520,7 +524,7 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
ssTailConfig.addInputToGroup(0);
ssTailConfig.setInputSerializer(serializer, 0);
ssTailConfig.setInputAsynchronouslyMaterialized(0, true);
- ssTailConfig.setInputMaterializationMemory(0, MEM_PER_CONSUMER * JobGraphUtils.MEGABYTE);
+ ssTailConfig.setRelativeInputMaterializationMemory(0, MEM_FRAC_PER_CONSUMER);
// output
ssTailConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
@@ -534,7 +538,7 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
// -------------------------- ws tail --------------------------------
JobTaskVertex wsTail = JobGraphUtils.createTask(IterationTailPactTask.class, "IterationWorksetTail",
- jobGraph, numSubTasks, numSubTasks);
+ jobGraph, numSubTasks);
TaskConfig wsTailConfig = new TaskConfig(wsTail.getConfiguration());
{
wsTailConfig.setIterationId(ITERATION_ID);
@@ -631,7 +635,7 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
// ------------------ the intermediate (ws update) ----------------------
JobTaskVertex wsUpdateIntermediate =
JobGraphUtils.createTask(IterationIntermediatePactTask.class, "WorksetUpdate", jobGraph,
- numSubTasks, numSubTasks);
+ numSubTasks);
TaskConfig wsUpdateConfig = new TaskConfig(wsUpdateIntermediate.getConfiguration());
{
wsUpdateConfig.setIterationId(ITERATION_ID);
@@ -661,7 +665,7 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
// -------------------------- ss tail --------------------------------
JobTaskVertex ssTail =
JobGraphUtils.createTask(IterationTailPactTask.class, "IterationSolutionSetTail", jobGraph,
- numSubTasks, numSubTasks);
+ numSubTasks);
TaskConfig ssTailConfig = new TaskConfig(ssTail.getConfiguration());
{
ssTailConfig.setIterationId(ITERATION_ID);
@@ -754,7 +758,7 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
// ------------------ the intermediate (ss update) ----------------------
JobTaskVertex ssJoinIntermediate = JobGraphUtils.createTask(IterationIntermediatePactTask.class,
- "Solution Set Update", jobGraph, numSubTasks, numSubTasks);
+ "Solution Set Update", jobGraph, numSubTasks);
TaskConfig ssJoinIntermediateConfig = new TaskConfig(ssJoinIntermediate.getConfiguration());
{
ssJoinIntermediateConfig.setIterationId(ITERATION_ID);
@@ -782,7 +786,7 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
// -------------------------- ws tail --------------------------------
JobTaskVertex wsTail = JobGraphUtils.createTask(IterationTailPactTask.class, "IterationWorksetTail",
- jobGraph, numSubTasks, numSubTasks);
+ jobGraph, numSubTasks);
TaskConfig wsTailConfig = new TaskConfig(wsTail.getConfiguration());
{
wsTailConfig.setIterationId(ITERATION_ID);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/DanglingPageRankNepheleITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/DanglingPageRankNepheleITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/DanglingPageRankNepheleITCase.java
index 872cabc..002533a 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/DanglingPageRankNepheleITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/DanglingPageRankNepheleITCase.java
@@ -34,6 +34,10 @@ public class DanglingPageRankNepheleITCase extends RecordAPITestBase {
protected String edgesPath;
protected String resultPath;
+ public DanglingPageRankNepheleITCase(){
+ setTaskManagerNumSlots(DOP);
+ }
+
@Override
protected void preSubmit() throws Exception {
@@ -45,8 +49,7 @@ public class DanglingPageRankNepheleITCase extends RecordAPITestBase {
@Override
protected JobGraph getJobGraph() throws Exception {
String[] parameters = new String[] {
- "4",
- "4",
+ new Integer(DOP).toString(),
pagesWithRankPath,
edgesPath,
resultPath,
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/DanglingPageRankWithCombinerNepheleITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/DanglingPageRankWithCombinerNepheleITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/DanglingPageRankWithCombinerNepheleITCase.java
index 59d5206..e42620e 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/DanglingPageRankWithCombinerNepheleITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/DanglingPageRankWithCombinerNepheleITCase.java
@@ -22,6 +22,10 @@ public class DanglingPageRankWithCombinerNepheleITCase extends RecordAPITestBase
protected String pagesWithRankPath;
protected String edgesPath;
protected String resultPath;
+
+ public DanglingPageRankWithCombinerNepheleITCase(){
+ setTaskManagerNumSlots(DOP);
+ }
@Override
protected void preSubmit() throws Exception {
@@ -33,8 +37,7 @@ public class DanglingPageRankWithCombinerNepheleITCase extends RecordAPITestBase
@Override
protected JobGraph getJobGraph() throws Exception {
String[] parameters = new String[] {
- "4",
- "4",
+ new Integer(DOP).toString(),
pagesWithRankPath,
edgesPath,
resultPath,
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/IterationWithChainingNepheleITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/IterationWithChainingNepheleITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/IterationWithChainingNepheleITCase.java
index ef7c9d2..8a68402 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/IterationWithChainingNepheleITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/IterationWithChainingNepheleITCase.java
@@ -75,6 +75,7 @@ public class IterationWithChainingNepheleITCase extends RecordAPITestBase {
public IterationWithChainingNepheleITCase(Configuration config) {
super(config);
+ setTaskManagerNumSlots(DOP);
}
@Override
@@ -94,7 +95,7 @@ public class IterationWithChainingNepheleITCase extends RecordAPITestBase {
@Parameterized.Parameters
public static Collection<Object[]> getConfigurations() {
Configuration config = new Configuration();
- config.setInteger("ChainedMapperNepheleITCase#NoSubtasks", 2);
+ config.setInteger("ChainedMapperNepheleITCase#NoSubtasks", DOP);
config.setInteger("ChainedMapperNepheleITCase#MaxIterations", 2);
return toParameterList(config);
}
@@ -118,8 +119,6 @@ public class IterationWithChainingNepheleITCase extends RecordAPITestBase {
final TypeComparatorFactory<Record> comparator =
new RecordComparatorFactory(new int[] { 0 }, new Class[] { IntValue.class });
- final long MEM_PER_CONSUMER = 2;
-
final int ITERATION_ID = 1;
// --------------------------------------------------------------------------------------------------------------
@@ -128,7 +127,7 @@ public class IterationWithChainingNepheleITCase extends RecordAPITestBase {
// - input -----------------------------------------------------------------------------------------------------
JobInputVertex input = JobGraphUtils.createInput(
- new PointInFormat(), inputPath, "Input", jobGraph, numSubTasks, numSubTasks);
+ new PointInFormat(), inputPath, "Input", jobGraph, numSubTasks);
TaskConfig inputConfig = new TaskConfig(input.getConfiguration());
{
inputConfig.setOutputSerializer(serializer);
@@ -137,7 +136,7 @@ public class IterationWithChainingNepheleITCase extends RecordAPITestBase {
// - head ------------------------------------------------------------------------------------------------------
JobTaskVertex head = JobGraphUtils.createTask(
- IterationHeadPactTask.class, "Iteration Head", jobGraph, numSubTasks, numSubTasks);
+ IterationHeadPactTask.class, "Iteration Head", jobGraph, numSubTasks);
TaskConfig headConfig = new TaskConfig(head.getConfiguration());
{
headConfig.setIterationId(ITERATION_ID);
@@ -168,12 +167,12 @@ public class IterationWithChainingNepheleITCase extends RecordAPITestBase {
headConfig.setStubWrapper(new UserCodeClassWrapper<DummyMapper>(DummyMapper.class));
// back channel
- headConfig.setBackChannelMemory(MEM_PER_CONSUMER * JobGraphUtils.MEGABYTE);
+ headConfig.setRelativeBackChannelMemory(1.0);
}
// - tail ------------------------------------------------------------------------------------------------------
JobTaskVertex tail = JobGraphUtils.createTask(
- IterationTailPactTask.class, "Chained Iteration Tail", jobGraph, numSubTasks, numSubTasks);
+ IterationTailPactTask.class, "Chained Iteration Tail", jobGraph, numSubTasks);
TaskConfig tailConfig = new TaskConfig(tail.getConfiguration());
{
tailConfig.setIterationId(ITERATION_ID);
@@ -210,7 +209,7 @@ public class IterationWithChainingNepheleITCase extends RecordAPITestBase {
}
// - output ----------------------------------------------------------------------------------------------------
- JobOutputVertex output = JobGraphUtils.createFileOutput(jobGraph, "Output", numSubTasks, numSubTasks);
+ JobOutputVertex output = JobGraphUtils.createFileOutput(jobGraph, "Output", numSubTasks);
TaskConfig outputConfig = new TaskConfig(output.getConfiguration());
{
outputConfig.addInputToGroup(0);
@@ -221,7 +220,7 @@ public class IterationWithChainingNepheleITCase extends RecordAPITestBase {
}
// - fake tail -------------------------------------------------------------------------------------------------
- JobOutputVertex fakeTail = JobGraphUtils.createFakeOutput(jobGraph, "Fake Tail", numSubTasks, numSubTasks);
+ JobOutputVertex fakeTail = JobGraphUtils.createFakeOutput(jobGraph, "Fake Tail", numSubTasks);
// - sync ------------------------------------------------------------------------------------------------------
JobOutputVertex sync = JobGraphUtils.createSync(jobGraph, numSubTasks);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/JobGraphUtils.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/JobGraphUtils.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/JobGraphUtils.java
index a229086..109c91a 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/JobGraphUtils.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/JobGraphUtils.java
@@ -51,14 +51,14 @@ public class JobGraphUtils {
}
public static <T extends FileInputFormat<?>> JobInputVertex createInput(T stub, String path, String name, JobGraph graph,
- int degreeOfParallelism, int numSubTasksPerInstance)
+ int degreeOfParallelism)
{
stub.setFilePath(path);
- return createInput(new UserCodeObjectWrapper<T>(stub), name, graph, degreeOfParallelism, numSubTasksPerInstance);
+ return createInput(new UserCodeObjectWrapper<T>(stub), name, graph, degreeOfParallelism);
}
private static <T extends InputFormat<?,?>> JobInputVertex createInput(UserCodeWrapper<T> stub, String name, JobGraph graph,
- int degreeOfParallelism, int numSubTasksPerInstance)
+ int degreeOfParallelism)
{
JobInputVertex inputVertex = new JobInputVertex(name, graph);
@@ -67,8 +67,7 @@ public class JobGraphUtils {
inputVertex.setInputClass(clazz);
inputVertex.setNumberOfSubtasks(degreeOfParallelism);
- inputVertex.setNumberOfSubtasksPerInstance(numSubTasksPerInstance);
-
+
TaskConfig inputConfig = new TaskConfig(inputVertex.getConfiguration());
inputConfig.setStubWrapper(stub);
@@ -89,12 +88,11 @@ public class JobGraphUtils {
}
public static JobTaskVertex createTask(@SuppressWarnings("rawtypes") Class<? extends RegularPactTask> task, String name, JobGraph graph,
- int degreeOfParallelism, int numSubtasksPerInstance)
+ int degreeOfParallelism)
{
JobTaskVertex taskVertex = new JobTaskVertex(name, graph);
taskVertex.setTaskClass(task);
taskVertex.setNumberOfSubtasks(degreeOfParallelism);
- taskVertex.setNumberOfSubtasksPerInstance(numSubtasksPerInstance);
return taskVertex;
}
@@ -107,23 +105,19 @@ public class JobGraphUtils {
return sync;
}
- public static JobOutputVertex createFakeOutput(JobGraph jobGraph, String name, int degreeOfParallelism,
- int numSubTasksPerInstance)
+ public static JobOutputVertex createFakeOutput(JobGraph jobGraph, String name, int degreeOfParallelism)
{
JobOutputVertex outputVertex = new JobOutputVertex(name, jobGraph);
outputVertex.setOutputClass(FakeOutputTask.class);
outputVertex.setNumberOfSubtasks(degreeOfParallelism);
- outputVertex.setNumberOfSubtasksPerInstance(numSubTasksPerInstance);
return outputVertex;
}
- public static JobOutputVertex createFileOutput(JobGraph jobGraph, String name, int degreeOfParallelism,
- int numSubTasksPerInstance)
+ public static JobOutputVertex createFileOutput(JobGraph jobGraph, String name, int degreeOfParallelism)
{
JobOutputVertex sinkVertex = new JobOutputVertex(name, jobGraph);
sinkVertex.setOutputClass(DataSinkTask.class);
sinkVertex.setNumberOfSubtasks(degreeOfParallelism);
- sinkVertex.setNumberOfSubtasksPerInstance(numSubTasksPerInstance);
return sinkVertex;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java
index b112741..5b2433e 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java
@@ -92,7 +92,6 @@ public class CustomCompensatableDanglingPageRank {
public static JobGraph getJobGraph(String[] args) throws Exception {
int degreeOfParallelism = 2;
- int numSubTasksPerInstance = degreeOfParallelism;
String pageWithRankInputPath = ""; //"file://" + PlayConstants.PLAY_DIR + "test-inputs/danglingpagerank/pageWithRank";
String adjacencyListInputPath = ""; //"file://" + PlayConstants.PLAY_DIR +
// "test-inputs/danglingpagerank/adjacencylists";
@@ -109,31 +108,32 @@ public class CustomCompensatableDanglingPageRank {
int failingIteration = 2;
double messageLoss = 0.75;
- if (args.length >= 15) {
+ if (args.length >= 14) {
degreeOfParallelism = Integer.parseInt(args[0]);
- numSubTasksPerInstance = Integer.parseInt(args[1]);
- pageWithRankInputPath = args[2];
- adjacencyListInputPath = args[3];
- outputPath = args[4];
-// confPath = args[5];
- minorConsumer = Integer.parseInt(args[6]);
- matchMemory = Integer.parseInt(args[7]);
- coGroupSortMemory = Integer.parseInt(args[8]);
- numIterations = Integer.parseInt(args[9]);
- numVertices = Long.parseLong(args[10]);
- numDanglingVertices = Long.parseLong(args[11]);
- failingWorkers = args[12];
- failingIteration = Integer.parseInt(args[13]);
- messageLoss = Double.parseDouble(args[14]);
+ pageWithRankInputPath = args[1];
+ adjacencyListInputPath = args[2];
+ outputPath = args[3];
+// confPath = args[4];
+ minorConsumer = Integer.parseInt(args[5]);
+ matchMemory = Integer.parseInt(args[6]);
+ coGroupSortMemory = Integer.parseInt(args[7]);
+ numIterations = Integer.parseInt(args[8]);
+ numVertices = Long.parseLong(args[9]);
+ numDanglingVertices = Long.parseLong(args[10]);
+ failingWorkers = args[11];
+ failingIteration = Integer.parseInt(args[12]);
+ messageLoss = Double.parseDouble(args[13]);
}
+ int totalMemoryConsumption = 3*minorConsumer + matchMemory + coGroupSortMemory;
+
JobGraph jobGraph = new JobGraph("CompensatableDanglingPageRank");
// --------------- the inputs ---------------------
// page rank input
JobInputVertex pageWithRankInput = JobGraphUtils.createInput(new CustomImprovedDanglingPageRankInputFormat(),
- pageWithRankInputPath, "DanglingPageWithRankInput", jobGraph, degreeOfParallelism, numSubTasksPerInstance);
+ pageWithRankInputPath, "DanglingPageWithRankInput", jobGraph, degreeOfParallelism);
TaskConfig pageWithRankInputConfig = new TaskConfig(pageWithRankInput.getConfiguration());
pageWithRankInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
pageWithRankInputConfig.setOutputComparator(vertexWithRankAndDanglingComparator, 0);
@@ -142,7 +142,7 @@ public class CustomCompensatableDanglingPageRank {
// edges as adjacency list
JobInputVertex adjacencyListInput = JobGraphUtils.createInput(new CustomImprovedAdjacencyListInputFormat(),
- adjacencyListInputPath, "AdjancencyListInput", jobGraph, degreeOfParallelism, numSubTasksPerInstance);
+ adjacencyListInputPath, "AdjancencyListInput", jobGraph, degreeOfParallelism);
TaskConfig adjacencyListInputConfig = new TaskConfig(adjacencyListInput.getConfiguration());
adjacencyListInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
adjacencyListInputConfig.setOutputSerializer(vertexWithAdjacencyListSerializer);
@@ -150,7 +150,7 @@ public class CustomCompensatableDanglingPageRank {
// --------------- the head ---------------------
JobTaskVertex head = JobGraphUtils.createTask(IterationHeadPactTask.class, "IterationHead", jobGraph,
- degreeOfParallelism, numSubTasksPerInstance);
+ degreeOfParallelism);
TaskConfig headConfig = new TaskConfig(head.getConfiguration());
headConfig.setIterationId(ITERATION_ID);
@@ -160,12 +160,12 @@ public class CustomCompensatableDanglingPageRank {
headConfig.setInputSerializer(vertexWithRankAndDanglingSerializer, 0);
headConfig.setInputComparator(vertexWithRankAndDanglingComparator, 0);
headConfig.setInputLocalStrategy(0, LocalStrategy.SORT);
- headConfig.setMemoryInput(0, minorConsumer * JobGraphUtils.MEGABYTE);
+ headConfig.setRelativeMemoryInput(0, (double) minorConsumer / totalMemoryConsumption);
headConfig.setFilehandlesInput(0, NUM_FILE_HANDLES_PER_SORT);
headConfig.setSpillingThresholdInput(0, SORT_SPILL_THRESHOLD);
// back channel / iterations
- headConfig.setBackChannelMemory(minorConsumer * JobGraphUtils.MEGABYTE);
+ headConfig.setRelativeBackChannelMemory((double) minorConsumer / totalMemoryConsumption);
// output into iteration
headConfig.setOutputSerializer(vertexWithRankAndDanglingSerializer);
@@ -195,13 +195,13 @@ public class CustomCompensatableDanglingPageRank {
// --------------- the join ---------------------
JobTaskVertex intermediate = JobGraphUtils.createTask(IterationIntermediatePactTask.class,
- "IterationIntermediate", jobGraph, degreeOfParallelism, numSubTasksPerInstance);
+ "IterationIntermediate", jobGraph, degreeOfParallelism);
TaskConfig intermediateConfig = new TaskConfig(intermediate.getConfiguration());
intermediateConfig.setIterationId(ITERATION_ID);
// intermediateConfig.setDriver(RepeatableHashjoinMatchDriverWithCachedBuildside.class);
intermediateConfig.setDriver(BuildSecondCachedMatchDriver.class);
intermediateConfig.setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
- intermediateConfig.setMemoryDriver(matchMemory * JobGraphUtils.MEGABYTE);
+ intermediateConfig.setRelativeMemoryDriver((double) matchMemory / totalMemoryConsumption);
intermediateConfig.addInputToGroup(0);
intermediateConfig.addInputToGroup(1);
intermediateConfig.setInputSerializer(vertexWithRankAndDanglingSerializer, 0);
@@ -223,7 +223,7 @@ public class CustomCompensatableDanglingPageRank {
// ---------------- the tail (co group) --------------------
JobTaskVertex tail = JobGraphUtils.createTask(IterationTailPactTask.class, "IterationTail", jobGraph,
- degreeOfParallelism, numSubTasksPerInstance);
+ degreeOfParallelism);
TaskConfig tailConfig = new TaskConfig(tail.getConfiguration());
tailConfig.setIterationId(ITERATION_ID);
tailConfig.setIsWorksetUpdate();
@@ -240,10 +240,10 @@ public class CustomCompensatableDanglingPageRank {
tailConfig.setDriverComparator(vertexWithRankComparator, 1);
tailConfig.setDriverPairComparator(coGroupComparator);
tailConfig.setInputAsynchronouslyMaterialized(0, true);
- tailConfig.setInputMaterializationMemory(0, minorConsumer * JobGraphUtils.MEGABYTE);
+ tailConfig.setRelativeInputMaterializationMemory(0, (double)minorConsumer/totalMemoryConsumption);
tailConfig.setInputLocalStrategy(1, LocalStrategy.SORT);
tailConfig.setInputComparator(vertexWithRankComparator, 1);
- tailConfig.setMemoryInput(1, coGroupSortMemory * JobGraphUtils.MEGABYTE);
+ tailConfig.setRelativeMemoryInput(1, (double) coGroupSortMemory / totalMemoryConsumption);
tailConfig.setFilehandlesInput(1, NUM_FILE_HANDLES_PER_SORT);
tailConfig.setSpillingThresholdInput(1, SORT_SPILL_THRESHOLD);
@@ -261,8 +261,7 @@ public class CustomCompensatableDanglingPageRank {
// --------------- the output ---------------------
- JobOutputVertex output = JobGraphUtils.createFileOutput(jobGraph, "FinalOutput", degreeOfParallelism,
- numSubTasksPerInstance);
+ JobOutputVertex output = JobGraphUtils.createFileOutput(jobGraph, "FinalOutput", degreeOfParallelism);
TaskConfig outputConfig = new TaskConfig(output.getConfiguration());
outputConfig.addInputToGroup(0);
outputConfig.setInputSerializer(vertexWithRankAndDanglingSerializer, 0);
@@ -272,7 +271,7 @@ public class CustomCompensatableDanglingPageRank {
// --------------- the auxiliaries ---------------------
JobOutputVertex fakeTailOutput = JobGraphUtils.createFakeOutput(jobGraph, "FakeTailOutput",
- degreeOfParallelism, numSubTasksPerInstance);
+ degreeOfParallelism);
JobOutputVertex sync = JobGraphUtils.createSync(jobGraph, degreeOfParallelism);
TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java
index 37fab39..698296a 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java
@@ -93,7 +93,6 @@ public class CustomCompensatableDanglingPageRankWithCombiner {
public static JobGraph getJobGraph(String[] args) throws Exception {
int degreeOfParallelism = 2;
- int numSubTasksPerInstance = degreeOfParallelism;
String pageWithRankInputPath = ""; //"file://" + PlayConstants.PLAY_DIR + "test-inputs/danglingpagerank/pageWithRank";
String adjacencyListInputPath = ""; //"file://" + PlayConstants.PLAY_DIR +
// "test-inputs/danglingpagerank/adjacencylists";
@@ -109,31 +108,32 @@ public class CustomCompensatableDanglingPageRankWithCombiner {
int failingIteration = 2;
double messageLoss = 0.75;
- if (args.length >= 15) {
+ if (args.length >= 14) {
degreeOfParallelism = Integer.parseInt(args[0]);
- numSubTasksPerInstance = Integer.parseInt(args[1]);
- pageWithRankInputPath = args[2];
- adjacencyListInputPath = args[3];
- outputPath = args[4];
- // [5] is config path
- minorConsumer = Integer.parseInt(args[6]);
- matchMemory = Integer.parseInt(args[7]);
- coGroupSortMemory = Integer.parseInt(args[8]);
- numIterations = Integer.parseInt(args[9]);
- numVertices = Long.parseLong(args[10]);
- numDanglingVertices = Long.parseLong(args[11]);
- failingWorkers = args[12];
- failingIteration = Integer.parseInt(args[13]);
- messageLoss = Double.parseDouble(args[14]);
+ pageWithRankInputPath = args[1];
+ adjacencyListInputPath = args[2];
+ outputPath = args[3];
+ // [4] is config path
+ minorConsumer = Integer.parseInt(args[5]);
+ matchMemory = Integer.parseInt(args[6]);
+ coGroupSortMemory = Integer.parseInt(args[7]);
+ numIterations = Integer.parseInt(args[8]);
+ numVertices = Long.parseLong(args[9]);
+ numDanglingVertices = Long.parseLong(args[10]);
+ failingWorkers = args[11];
+ failingIteration = Integer.parseInt(args[12]);
+ messageLoss = Double.parseDouble(args[13]);
}
+ int totalMemoryConsumption = 3*minorConsumer + 2*coGroupSortMemory + matchMemory;
+
JobGraph jobGraph = new JobGraph("CompensatableDanglingPageRank");
// --------------- the inputs ---------------------
// page rank input
JobInputVertex pageWithRankInput = JobGraphUtils.createInput(new CustomImprovedDanglingPageRankInputFormat(),
- pageWithRankInputPath, "DanglingPageWithRankInput", jobGraph, degreeOfParallelism, numSubTasksPerInstance);
+ pageWithRankInputPath, "DanglingPageWithRankInput", jobGraph, degreeOfParallelism);
TaskConfig pageWithRankInputConfig = new TaskConfig(pageWithRankInput.getConfiguration());
pageWithRankInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
pageWithRankInputConfig.setOutputComparator(vertexWithRankAndDanglingComparator, 0);
@@ -142,7 +142,7 @@ public class CustomCompensatableDanglingPageRankWithCombiner {
// edges as adjacency list
JobInputVertex adjacencyListInput = JobGraphUtils.createInput(new CustomImprovedAdjacencyListInputFormat(),
- adjacencyListInputPath, "AdjancencyListInput", jobGraph, degreeOfParallelism, numSubTasksPerInstance);
+ adjacencyListInputPath, "AdjancencyListInput", jobGraph, degreeOfParallelism);
TaskConfig adjacencyListInputConfig = new TaskConfig(adjacencyListInput.getConfiguration());
adjacencyListInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
adjacencyListInputConfig.setOutputSerializer(vertexWithAdjacencyListSerializer);
@@ -150,7 +150,7 @@ public class CustomCompensatableDanglingPageRankWithCombiner {
// --------------- the head ---------------------
JobTaskVertex head = JobGraphUtils.createTask(IterationHeadPactTask.class, "IterationHead", jobGraph,
- degreeOfParallelism, numSubTasksPerInstance);
+ degreeOfParallelism);
TaskConfig headConfig = new TaskConfig(head.getConfiguration());
headConfig.setIterationId(ITERATION_ID);
@@ -160,12 +160,12 @@ public class CustomCompensatableDanglingPageRankWithCombiner {
headConfig.setInputSerializer(vertexWithRankAndDanglingSerializer, 0);
headConfig.setInputComparator(vertexWithRankAndDanglingComparator, 0);
headConfig.setInputLocalStrategy(0, LocalStrategy.SORT);
- headConfig.setMemoryInput(0, minorConsumer * JobGraphUtils.MEGABYTE);
+ headConfig.setRelativeMemoryInput(0, (double)minorConsumer/totalMemoryConsumption);
headConfig.setFilehandlesInput(0, NUM_FILE_HANDLES_PER_SORT);
headConfig.setSpillingThresholdInput(0, SORT_SPILL_THRESHOLD);
// back channel / iterations
- headConfig.setBackChannelMemory(minorConsumer * JobGraphUtils.MEGABYTE);
+ headConfig.setRelativeBackChannelMemory((double)minorConsumer/totalMemoryConsumption);
// output into iteration
headConfig.setOutputSerializer(vertexWithRankAndDanglingSerializer);
@@ -195,13 +195,13 @@ public class CustomCompensatableDanglingPageRankWithCombiner {
// --------------- the join ---------------------
JobTaskVertex intermediate = JobGraphUtils.createTask(IterationIntermediatePactTask.class,
- "IterationIntermediate", jobGraph, degreeOfParallelism, numSubTasksPerInstance);
+ "IterationIntermediate", jobGraph, degreeOfParallelism);
TaskConfig intermediateConfig = new TaskConfig(intermediate.getConfiguration());
intermediateConfig.setIterationId(ITERATION_ID);
// intermediateConfig.setDriver(RepeatableHashjoinMatchDriverWithCachedBuildside.class);
intermediateConfig.setDriver(BuildSecondCachedMatchDriver.class);
intermediateConfig.setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
- intermediateConfig.setMemoryDriver(matchMemory * JobGraphUtils.MEGABYTE);
+ intermediateConfig.setRelativeMemoryDriver((double)matchMemory/totalMemoryConsumption);
intermediateConfig.addInputToGroup(0);
intermediateConfig.addInputToGroup(1);
intermediateConfig.setInputSerializer(vertexWithRankAndDanglingSerializer, 0);
@@ -225,7 +225,7 @@ public class CustomCompensatableDanglingPageRankWithCombiner {
combinerConfig.setInputSerializer(vertexWithRankSerializer, 0);
combinerConfig.setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE);
combinerConfig.setDriverComparator(vertexWithRankComparator, 0);
- combinerConfig.setMemoryDriver(coGroupSortMemory * JobGraphUtils.MEGABYTE);
+ combinerConfig.setRelativeMemoryDriver((double)coGroupSortMemory/totalMemoryConsumption);
combinerConfig.setOutputSerializer(vertexWithRankSerializer);
combinerConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
combinerConfig.setOutputComparator(vertexWithRankComparator, 0);
@@ -235,7 +235,7 @@ public class CustomCompensatableDanglingPageRankWithCombiner {
// ---------------- the tail (co group) --------------------
JobTaskVertex tail = JobGraphUtils.createTask(IterationTailPactTask.class, "IterationTail", jobGraph,
- degreeOfParallelism, numSubTasksPerInstance);
+ degreeOfParallelism);
TaskConfig tailConfig = new TaskConfig(tail.getConfiguration());
tailConfig.setIterationId(ITERATION_ID);
tailConfig.setIsWorksetUpdate();
@@ -251,10 +251,10 @@ public class CustomCompensatableDanglingPageRankWithCombiner {
tailConfig.setDriverComparator(vertexWithRankComparator, 1);
tailConfig.setDriverPairComparator(coGroupComparator);
tailConfig.setInputAsynchronouslyMaterialized(0, true);
- tailConfig.setInputMaterializationMemory(0, minorConsumer * JobGraphUtils.MEGABYTE);
+ tailConfig.setRelativeInputMaterializationMemory(0, (double)minorConsumer/totalMemoryConsumption);
tailConfig.setInputLocalStrategy(1, LocalStrategy.SORT);
tailConfig.setInputComparator(vertexWithRankComparator, 1);
- tailConfig.setMemoryInput(1, coGroupSortMemory * JobGraphUtils.MEGABYTE);
+ tailConfig.setRelativeMemoryInput(1, (double)coGroupSortMemory/totalMemoryConsumption);
tailConfig.setFilehandlesInput(1, NUM_FILE_HANDLES_PER_SORT);
tailConfig.setSpillingThresholdInput(1, SORT_SPILL_THRESHOLD);
tailConfig.addIterationAggregator(CustomCompensatableDotProductCoGroup.AGGREGATOR_NAME, new PageRankStatsAggregator());
@@ -273,8 +273,7 @@ public class CustomCompensatableDanglingPageRankWithCombiner {
// --------------- the output ---------------------
- JobOutputVertex output = JobGraphUtils.createFileOutput(jobGraph, "FinalOutput", degreeOfParallelism,
- numSubTasksPerInstance);
+ JobOutputVertex output = JobGraphUtils.createFileOutput(jobGraph, "FinalOutput", degreeOfParallelism);
TaskConfig outputConfig = new TaskConfig(output.getConfiguration());
outputConfig.addInputToGroup(0);
outputConfig.setInputSerializer(vertexWithRankAndDanglingSerializer, 0);
@@ -284,7 +283,7 @@ public class CustomCompensatableDanglingPageRankWithCombiner {
// --------------- the auxiliaries ---------------------
JobOutputVertex fakeTailOutput = JobGraphUtils.createFakeOutput(jobGraph, "FakeTailOutput",
- degreeOfParallelism, numSubTasksPerInstance);
+ degreeOfParallelism);
JobOutputVertex sync = JobGraphUtils.createSync(jobGraph, degreeOfParallelism);
TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java
index 944f13b..f870fe6 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java
@@ -72,7 +72,6 @@ public class CompensatableDanglingPageRank {
public static JobGraph getJobGraph(String[] args) throws Exception {
int degreeOfParallelism = 2;
- int numSubTasksPerInstance = degreeOfParallelism;
String pageWithRankInputPath = ""; // "file://" + PlayConstants.PLAY_DIR + "test-inputs/danglingpagerank/pageWithRank";
String adjacencyListInputPath = ""; // "file://" + PlayConstants.PLAY_DIR +
// "test-inputs/danglingpagerank/adjacencylists";
@@ -91,29 +90,30 @@ public class CompensatableDanglingPageRank {
if (args.length >= 15) {
degreeOfParallelism = Integer.parseInt(args[0]);
- numSubTasksPerInstance = Integer.parseInt(args[1]);
- pageWithRankInputPath = args[2];
- adjacencyListInputPath = args[3];
- outputPath = args[4];
-// confPath = args[5];
- minorConsumer = Integer.parseInt(args[6]);
- matchMemory = Integer.parseInt(args[7]);
- coGroupSortMemory = Integer.parseInt(args[8]);
- numIterations = Integer.parseInt(args[9]);
- numVertices = Long.parseLong(args[10]);
- numDanglingVertices = Long.parseLong(args[11]);
- failingWorkers = args[12];
- failingIteration = Integer.parseInt(args[13]);
- messageLoss = Double.parseDouble(args[14]);
+ pageWithRankInputPath = args[1];
+ adjacencyListInputPath = args[2];
+ outputPath = args[3];
+// confPath = args[4];
+ minorConsumer = Integer.parseInt(args[5]);
+ matchMemory = Integer.parseInt(args[6]);
+ coGroupSortMemory = Integer.parseInt(args[7]);
+ numIterations = Integer.parseInt(args[8]);
+ numVertices = Long.parseLong(args[9]);
+ numDanglingVertices = Long.parseLong(args[10]);
+ failingWorkers = args[11];
+ failingIteration = Integer.parseInt(args[12]);
+ messageLoss = Double.parseDouble(args[13]);
}
+ int totalMemoryConsumption = 3*minorConsumer + matchMemory + coGroupSortMemory;
+
JobGraph jobGraph = new JobGraph("CompensatableDanglingPageRank");
// --------------- the inputs ---------------------
// page rank input
JobInputVertex pageWithRankInput = JobGraphUtils.createInput(new ImprovedDanglingPageRankInputFormat(),
- pageWithRankInputPath, "DanglingPageWithRankInput", jobGraph, degreeOfParallelism, numSubTasksPerInstance);
+ pageWithRankInputPath, "DanglingPageWithRankInput", jobGraph, degreeOfParallelism);
TaskConfig pageWithRankInputConfig = new TaskConfig(pageWithRankInput.getConfiguration());
pageWithRankInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
pageWithRankInputConfig.setOutputComparator(fieldZeroComparator, 0);
@@ -122,7 +122,7 @@ public class CompensatableDanglingPageRank {
// edges as adjacency list
JobInputVertex adjacencyListInput = JobGraphUtils.createInput(new ImprovedAdjacencyListInputFormat(),
- adjacencyListInputPath, "AdjancencyListInput", jobGraph, degreeOfParallelism, numSubTasksPerInstance);
+ adjacencyListInputPath, "AdjancencyListInput", jobGraph, degreeOfParallelism);
TaskConfig adjacencyListInputConfig = new TaskConfig(adjacencyListInput.getConfiguration());
adjacencyListInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
adjacencyListInputConfig.setOutputSerializer(recSerializer);
@@ -130,7 +130,7 @@ public class CompensatableDanglingPageRank {
// --------------- the head ---------------------
JobTaskVertex head = JobGraphUtils.createTask(IterationHeadPactTask.class, "IterationHead", jobGraph,
- degreeOfParallelism, numSubTasksPerInstance);
+ degreeOfParallelism);
TaskConfig headConfig = new TaskConfig(head.getConfiguration());
headConfig.setIterationId(ITERATION_ID);
@@ -140,12 +140,12 @@ public class CompensatableDanglingPageRank {
headConfig.setInputSerializer(recSerializer, 0);
headConfig.setInputComparator(fieldZeroComparator, 0);
headConfig.setInputLocalStrategy(0, LocalStrategy.SORT);
- headConfig.setMemoryInput(0, minorConsumer * JobGraphUtils.MEGABYTE);
+ headConfig.setRelativeMemoryInput(0, (double)minorConsumer/totalMemoryConsumption);
headConfig.setFilehandlesInput(0, NUM_FILE_HANDLES_PER_SORT);
headConfig.setSpillingThresholdInput(0, SORT_SPILL_THRESHOLD);
// back channel / iterations
- headConfig.setBackChannelMemory(minorConsumer * JobGraphUtils.MEGABYTE);
+ headConfig.setRelativeBackChannelMemory((double)minorConsumer/totalMemoryConsumption);
// output into iteration
headConfig.setOutputSerializer(recSerializer);
@@ -175,13 +175,13 @@ public class CompensatableDanglingPageRank {
// --------------- the join ---------------------
JobTaskVertex intermediate = JobGraphUtils.createTask(IterationIntermediatePactTask.class,
- "IterationIntermediate", jobGraph, degreeOfParallelism, numSubTasksPerInstance);
+ "IterationIntermediate", jobGraph, degreeOfParallelism);
TaskConfig intermediateConfig = new TaskConfig(intermediate.getConfiguration());
intermediateConfig.setIterationId(ITERATION_ID);
// intermediateConfig.setDriver(RepeatableHashjoinMatchDriverWithCachedBuildside.class);
intermediateConfig.setDriver(BuildSecondCachedMatchDriver.class);
intermediateConfig.setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
- intermediateConfig.setMemoryDriver(matchMemory * JobGraphUtils.MEGABYTE);
+ intermediateConfig.setRelativeMemoryDriver((double)matchMemory/totalMemoryConsumption);
intermediateConfig.addInputToGroup(0);
intermediateConfig.addInputToGroup(1);
intermediateConfig.setInputSerializer(recSerializer, 0);
@@ -203,7 +203,7 @@ public class CompensatableDanglingPageRank {
// ---------------- the tail (co group) --------------------
JobTaskVertex tail = JobGraphUtils.createTask(IterationTailPactTask.class, "IterationTail", jobGraph,
- degreeOfParallelism, numSubTasksPerInstance);
+ degreeOfParallelism);
TaskConfig tailConfig = new TaskConfig(tail.getConfiguration());
tailConfig.setIterationId(ITERATION_ID);
tailConfig.setIsWorksetUpdate();
@@ -220,10 +220,10 @@ public class CompensatableDanglingPageRank {
tailConfig.setDriverComparator(fieldZeroComparator, 1);
tailConfig.setDriverPairComparator(pairComparatorFactory);
tailConfig.setInputAsynchronouslyMaterialized(0, true);
- tailConfig.setInputMaterializationMemory(0, minorConsumer * JobGraphUtils.MEGABYTE);
+ tailConfig.setRelativeInputMaterializationMemory(0, (double)minorConsumer/totalMemoryConsumption);
tailConfig.setInputLocalStrategy(1, LocalStrategy.SORT);
tailConfig.setInputComparator(fieldZeroComparator, 1);
- tailConfig.setMemoryInput(1, coGroupSortMemory * JobGraphUtils.MEGABYTE);
+ tailConfig.setRelativeMemoryInput(1, (double)coGroupSortMemory/totalMemoryConsumption);
tailConfig.setFilehandlesInput(1, NUM_FILE_HANDLES_PER_SORT);
tailConfig.setSpillingThresholdInput(1, SORT_SPILL_THRESHOLD);
@@ -241,8 +241,7 @@ public class CompensatableDanglingPageRank {
// --------------- the output ---------------------
- JobOutputVertex output = JobGraphUtils.createFileOutput(jobGraph, "FinalOutput", degreeOfParallelism,
- numSubTasksPerInstance);
+ JobOutputVertex output = JobGraphUtils.createFileOutput(jobGraph, "FinalOutput", degreeOfParallelism);
TaskConfig outputConfig = new TaskConfig(output.getConfiguration());
outputConfig.addInputToGroup(0);
outputConfig.setInputSerializer(recSerializer, 0);
@@ -252,7 +251,7 @@ public class CompensatableDanglingPageRank {
// --------------- the auxiliaries ---------------------
JobOutputVertex fakeTailOutput = JobGraphUtils.createFakeOutput(jobGraph, "FakeTailOutput",
- degreeOfParallelism, numSubTasksPerInstance);
+ degreeOfParallelism);
JobOutputVertex sync = JobGraphUtils.createSync(jobGraph, degreeOfParallelism);
TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/localDistributed/PackagedProgramEndToEndITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/localDistributed/PackagedProgramEndToEndITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/localDistributed/PackagedProgramEndToEndITCase.java
index 3626ba7..0e297ec 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/localDistributed/PackagedProgramEndToEndITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/localDistributed/PackagedProgramEndToEndITCase.java
@@ -14,6 +14,7 @@ package eu.stratosphere.test.localDistributed;
import java.io.File;
import java.io.FileWriter;
+import java.net.URL;
import eu.stratosphere.client.minicluster.NepheleMiniCluster;
import org.junit.Assert;
@@ -28,6 +29,8 @@ import eu.stratosphere.util.LogUtils;
public class PackagedProgramEndToEndITCase {
+ private static final int DOP = 4;
+
static {
LogUtils.initializeDefaultTestConsoleLogger();
}
@@ -53,16 +56,18 @@ public class PackagedProgramEndToEndITCase {
fwClusters.write(KMeansData.INITIAL_CENTERS);
fwClusters.close();
- String jarPath = "target/maven-test-jar.jar";
+ URL jarFileURL = getClass().getResource("/KMeansForTest.jar");
+ String jarPath = jarFileURL.getFile();
// run KMeans
- cluster.setNumTaskManager(2);
+ cluster.setNumTaskTracker(2);
+ cluster.setTaskManagerNumSlots(2);
cluster.start();
RemoteExecutor ex = new RemoteExecutor("localhost", 6498);
-
+
ex.executeJar(jarPath,
- "eu.stratosphere.test.util.testjar.KMeansForTest",
- new String[] {
+ "eu.stratosphere.examples.scala.testing.KMeansForTest",
+ new String[] {new Integer(DOP).toString(),
points.toURI().toString(),
clusters.toURI().toString(),
outFile.toURI().toString(),
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/operators/UnionSinkITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/operators/UnionSinkITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/operators/UnionSinkITCase.java
index b9f0f2c..2637acb 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/operators/UnionSinkITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/operators/UnionSinkITCase.java
@@ -44,6 +44,7 @@ public class UnionSinkITCase extends RecordAPITestBase {
public UnionSinkITCase(Configuration testConfig) {
super(testConfig);
+ setTaskManagerNumSlots(DOP);
}
private static final String MAP_IN = "1 1\n2 2\n2 8\n4 4\n4 4\n6 6\n7 7\n8 8\n" +
@@ -115,7 +116,7 @@ public class UnionSinkITCase extends RecordAPITestBase {
output.addInput(testMapper2);
Plan plan = new Plan(output);
- plan.setDefaultParallelism(4);
+ plan.setDefaultParallelism(DOP);
PactCompiler pc = new PactCompiler(new DataStatistics());
OptimizedPlan op = pc.compile(plan);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/CollectionSourceTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/CollectionSourceTest.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/CollectionSourceTest.java
index 48f703a..1f30075 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/CollectionSourceTest.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/CollectionSourceTest.java
@@ -36,8 +36,14 @@ import eu.stratosphere.util.Collector;
*/
public class CollectionSourceTest extends RecordAPITestBase {
+ private static final int DOP = 4;
+
protected String resultPath;
+ public CollectionSourceTest(){
+ setTaskManagerNumSlots(DOP);
+ }
+
public static class Join extends JoinFunction {
private static final long serialVersionUID = 1L;
@@ -110,7 +116,7 @@ public class CollectionSourceTest extends RecordAPITestBase {
@Override
protected Plan getTestJob() {
- return getPlan(4, resultPath);
+ return getPlan(DOP, resultPath);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/ComputeEdgeDegreesITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/ComputeEdgeDegreesITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/ComputeEdgeDegreesITCase.java
index f52e108..7a86112 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/ComputeEdgeDegreesITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/ComputeEdgeDegreesITCase.java
@@ -35,6 +35,7 @@ public class ComputeEdgeDegreesITCase extends RecordAPITestBase {
public ComputeEdgeDegreesITCase(Configuration config) {
super(config);
+ setTaskManagerNumSlots(DOP);
}
@Override
@@ -58,7 +59,7 @@ public class ComputeEdgeDegreesITCase extends RecordAPITestBase {
@Parameters
public static Collection<Object[]> getConfigurations() {
Configuration config = new Configuration();
- config.setInteger("ComputeEdgeDegreesTest#NumSubtasks", 4);
+ config.setInteger("ComputeEdgeDegreesTest#NumSubtasks", DOP);
return toParameterList(config);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/EnumTrianglesOnEdgesWithDegreesITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/EnumTrianglesOnEdgesWithDegreesITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/EnumTrianglesOnEdgesWithDegreesITCase.java
index b4b0386..68d66a6 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/EnumTrianglesOnEdgesWithDegreesITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/EnumTrianglesOnEdgesWithDegreesITCase.java
@@ -35,6 +35,7 @@ public class EnumTrianglesOnEdgesWithDegreesITCase extends RecordAPITestBase {
public EnumTrianglesOnEdgesWithDegreesITCase(Configuration config) {
super(config);
+ setTaskManagerNumSlots(DOP);
}
@Override
@@ -60,7 +61,7 @@ public class EnumTrianglesOnEdgesWithDegreesITCase extends RecordAPITestBase {
@Parameters
public static Collection<Object[]> getConfigurations() {
Configuration config = new Configuration();
- config.setInteger("EnumTrianglesTest#NumSubtasks", 4);
+ config.setInteger("EnumTrianglesTest#NumSubtasks", DOP);
return toParameterList(config);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/EnumTrianglesRDFITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/EnumTrianglesRDFITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/EnumTrianglesRDFITCase.java
index 945cc67..93e9bfe 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/EnumTrianglesRDFITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/EnumTrianglesRDFITCase.java
@@ -54,7 +54,7 @@ public class EnumTrianglesRDFITCase extends RecordAPITestBase {
protected Plan getTestJob() {
EnumTrianglesRdfFoaf enumTriangles = new EnumTrianglesRdfFoaf();
return enumTriangles.getPlan(
- config.getString("EnumTrianglesTest#NoSubtasks", "4"), edgesPath, resultPath);
+ config.getString("EnumTrianglesTest#NoSubtasks", new Integer(DOP).toString()), edgesPath, resultPath);
}
@Override
@@ -65,7 +65,7 @@ public class EnumTrianglesRDFITCase extends RecordAPITestBase {
@Parameters
public static Collection<Object[]> getConfigurations() {
Configuration config = new Configuration();
- config.setInteger("EnumTrianglesTest#NoSubtasks", 4);
+ config.setInteger("EnumTrianglesTest#NoSubtasks", DOP);
return toParameterList(config);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/GlobalSortingITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/GlobalSortingITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/GlobalSortingITCase.java
index 7aa9a78..f628ca5 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/GlobalSortingITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobTests/GlobalSortingITCase.java
@@ -38,6 +38,9 @@ public class GlobalSortingITCase extends RecordAPITestBase {
private String sortedRecords;
+ public GlobalSortingITCase(){
+ setTaskManagerNumSlots(DOP);
+ }
@Override
protected void preSubmit() throws Exception {
@@ -77,7 +80,7 @@ public class GlobalSortingITCase extends RecordAPITestBase {
@Override
protected Plan getTestJob() {
GlobalSort globalSort = new GlobalSort();
- return globalSort.getPlan("4", recordsPath, resultPath);
+ return globalSort.getPlan(new Integer(DOP).toString(), recordsPath, resultPath);
}
@Override