You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2015/03/23 09:09:23 UTC
[3/9] flink git commit: [FLINK-1679] use a consistent name for
parallelism
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/cancelling/MatchJoinCancelingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/MatchJoinCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/MatchJoinCancelingITCase.java
index 8a324a3..8197e27 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/MatchJoinCancelingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/MatchJoinCancelingITCase.java
@@ -37,10 +37,10 @@ import org.apache.flink.util.Collector;
@SuppressWarnings("deprecation")
public class MatchJoinCancelingITCase extends CancellingTestBase {
- private static final int DOP = 4;
+ private static final int parallelism = 4;
public MatchJoinCancelingITCase(){
- setTaskManagerNumSlots(DOP);
+ setTaskManagerNumSlots(parallelism);
}
// --------------- Test Sort Matches that are canceled while still reading / sorting -----------------
@@ -60,7 +60,7 @@ public class MatchJoinCancelingITCase extends CancellingTestBase {
GenericDataSink sink = new GenericDataSink(new DiscardingOutputFormat(), matcher, "Sink");
Plan p = new Plan(sink);
- p.setDefaultParallelism(DOP);
+ p.setDefaultParallelism(parallelism);
runAndCancelJob(p, 3000, 10*1000);
}
@@ -81,7 +81,7 @@ public class MatchJoinCancelingITCase extends CancellingTestBase {
GenericDataSink sink = new GenericDataSink(new DiscardingOutputFormat(), matcher, "Sink");
Plan p = new Plan(sink);
- p.setDefaultParallelism(DOP);
+ p.setDefaultParallelism(parallelism);
runAndCancelJob(p, 5000, 10*1000);
}
@@ -102,7 +102,7 @@ public class MatchJoinCancelingITCase extends CancellingTestBase {
GenericDataSink sink = new GenericDataSink(new DiscardingOutputFormat(), matcher, "Sink");
Plan p = new Plan(sink);
- p.setDefaultParallelism(DOP);
+ p.setDefaultParallelism(parallelism);
runAndCancelJob(p, 5000);
@@ -129,7 +129,7 @@ public class MatchJoinCancelingITCase extends CancellingTestBase {
GenericDataSink sink = new GenericDataSink(new DiscardingOutputFormat(), matcher, "Sink");
Plan p = new Plan(sink);
- p.setDefaultParallelism(DOP);
+ p.setDefaultParallelism(parallelism);
runAndCancelJob(p, 30 * 1000, 30 * 1000);
}
@@ -157,7 +157,7 @@ public class MatchJoinCancelingITCase extends CancellingTestBase {
GenericDataSink sink = new GenericDataSink(new DiscardingOutputFormat(), matcher, "Sink");
Plan p = new Plan(sink);
- p.setDefaultParallelism(DOP);
+ p.setDefaultParallelism(parallelism);
runAndCancelJob(p, 10 * 1000, 20 * 1000);
}
@@ -183,7 +183,7 @@ public class MatchJoinCancelingITCase extends CancellingTestBase {
GenericDataSink sink = new GenericDataSink(new DiscardingOutputFormat(), matcher, "Sink");
Plan p = new Plan(sink);
- p.setDefaultParallelism(DOP);
+ p.setDefaultParallelism(parallelism);
runAndCancelJob(p, 10 * 1000, 10 * 1000);
}
@@ -191,7 +191,7 @@ public class MatchJoinCancelingITCase extends CancellingTestBase {
// -------------------------------------- Test System corner cases ---------------------------------
// @Test
- public void testCancelSortMatchWithHighDOP() throws Exception {
+ public void testCancelSortMatchWithHighparallelism() throws Exception {
GenericDataSource<InfiniteIntegerInputFormat> source1 =
new GenericDataSource<InfiniteIntegerInputFormat>(new InfiniteIntegerInputFormat(), "Source 1");
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/clients/examples/LocalExecutorITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/clients/examples/LocalExecutorITCase.java b/flink-tests/src/test/java/org/apache/flink/test/clients/examples/LocalExecutorITCase.java
index 53b70ad..62e2893 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/clients/examples/LocalExecutorITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/clients/examples/LocalExecutorITCase.java
@@ -30,7 +30,7 @@ import org.junit.Test;
public class LocalExecutorITCase {
- private static final int DOP = 4;
+ private static final int parallelism = 4;
@Test
public void testLocalExecutorWithWordCount() {
@@ -50,11 +50,11 @@ public class LocalExecutorITCase {
LocalExecutor executor = new LocalExecutor();
executor.setDefaultOverwriteFiles(true);
- executor.setTaskManagerNumSlots(DOP);
+ executor.setTaskManagerNumSlots(parallelism);
executor.setPrintStatusDuringExecution(false);
executor.start();
- executor.executePlan(wc.getPlan(Integer.valueOf(DOP).toString(), inFile.toURI().toString(),
+ executor.executePlan(wc.getPlan(Integer.valueOf(parallelism).toString(), inFile.toURI().toString(),
outFile.toURI().toString()));
executor.stop();
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/WordCountITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/WordCountITCase.java b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/WordCountITCase.java
index a69b5c2..5c1c500 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/WordCountITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/WordCountITCase.java
@@ -28,7 +28,7 @@ public class WordCountITCase extends JavaProgramTestBase {
protected String resultPath;
public WordCountITCase(){
- setDegreeOfParallelism(4);
+ setParallelism(4);
setNumTaskManagers(2);
setTaskManagerNumSlots(2);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/TaskFailureITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/TaskFailureITCase.java b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/TaskFailureITCase.java
index 9021c6a..0d2c469 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/TaskFailureITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/TaskFailureITCase.java
@@ -42,7 +42,7 @@ import org.apache.flink.util.Collector;
@SuppressWarnings("deprecation")
public class TaskFailureITCase extends FailingTestBase {
- private static final int DOP = 4;
+ private static final int parallelism = 4;
// input for map tasks
private static final String MAP_IN = "1 1\n2 2\n2 8\n4 4\n4 4\n6 6\n7 7\n8 8\n" +
@@ -57,7 +57,7 @@ public class TaskFailureITCase extends FailingTestBase {
private String resultPath;
public TaskFailureITCase(){
- setTaskManagerNumSlots(DOP);
+ setTaskManagerNumSlots(parallelism);
}
@Override
@@ -85,7 +85,7 @@ public class TaskFailureITCase extends FailingTestBase {
// generate plan
Plan plan = new Plan(output);
- plan.setDefaultParallelism(DOP);
+ plan.setDefaultParallelism(parallelism);
// optimize and compile plan
Optimizer pc = new Optimizer(new DataStatistics());
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsITCase.java
index 0a4673a..9023a1f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsITCase.java
@@ -58,7 +58,7 @@ public class CoGroupConnectedComponentsITCase extends RecordAPITestBase {
protected String resultPath;
public CoGroupConnectedComponentsITCase(){
- setTaskManagerNumSlots(DOP);
+ setTaskManagerNumSlots(parallelism);
}
@@ -71,7 +71,7 @@ public class CoGroupConnectedComponentsITCase extends RecordAPITestBase {
@Override
protected Plan getTestJob() {
- return getPlan(DOP, verticesPath, edgesPath, resultPath, 100);
+ return getPlan(parallelism, verticesPath, edgesPath, resultPath, 100);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsITCase.java
index f6ada63..df3c00d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsITCase.java
@@ -41,7 +41,7 @@ public class ConnectedComponentsITCase extends RecordAPITestBase {
protected String resultPath;
public ConnectedComponentsITCase(){
- setTaskManagerNumSlots(DOP);
+ setTaskManagerNumSlots(parallelism);
}
@@ -55,7 +55,7 @@ public class ConnectedComponentsITCase extends RecordAPITestBase {
@Override
protected Plan getTestJob() {
WorksetConnectedComponents cc = new WorksetConnectedComponents();
- return cc.getPlan(Integer.valueOf(DOP).toString(), verticesPath, edgesPath, resultPath, "100");
+ return cc.getPlan(Integer.valueOf(parallelism).toString(), verticesPath, edgesPath, resultPath, "100");
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithDeferredUpdateITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithDeferredUpdateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithDeferredUpdateITCase.java
index 0b1e372..d5d150d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithDeferredUpdateITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithDeferredUpdateITCase.java
@@ -65,7 +65,7 @@ public class ConnectedComponentsWithDeferredUpdateITCase extends RecordAPITestBa
public ConnectedComponentsWithDeferredUpdateITCase(Configuration config) {
super(config);
- setTaskManagerNumSlots(DOP);
+ setTaskManagerNumSlots(parallelism);
}
@Override
@@ -78,7 +78,7 @@ public class ConnectedComponentsWithDeferredUpdateITCase extends RecordAPITestBa
@Override
protected Plan getTestJob() {
boolean extraMapper = config.getBoolean("ExtraMapper", false);
- return getPlan(DOP, verticesPath, edgesPath, resultPath, 100, extraMapper);
+ return getPlan(parallelism, verticesPath, edgesPath, resultPath, 100, extraMapper);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithSolutionSetFirstITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithSolutionSetFirstITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithSolutionSetFirstITCase.java
index 33650c5..b97d0ad 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithSolutionSetFirstITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithSolutionSetFirstITCase.java
@@ -60,7 +60,7 @@ public class ConnectedComponentsWithSolutionSetFirstITCase extends RecordAPITest
protected String resultPath;
public ConnectedComponentsWithSolutionSetFirstITCase(){
- setTaskManagerNumSlots(DOP);
+ setTaskManagerNumSlots(parallelism);
}
@@ -73,7 +73,7 @@ public class ConnectedComponentsWithSolutionSetFirstITCase extends RecordAPITest
@Override
protected Plan getTestJob() {
- return getPlanForWorksetConnectedComponentsWithSolutionSetAsFirstInput(DOP, verticesPath, edgesPath,
+ return getPlanForWorksetConnectedComponentsWithSolutionSetAsFirstInput(parallelism, verticesPath, edgesPath,
resultPath, 100);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/iterative/DanglingPageRankITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/DanglingPageRankITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/DanglingPageRankITCase.java
index 39617b4..e2d095d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/DanglingPageRankITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/DanglingPageRankITCase.java
@@ -40,7 +40,7 @@ public class DanglingPageRankITCase extends RecordAPITestBase {
protected Plan getTestJob() {
DanglingPageRank pr = new DanglingPageRank();
Plan plan = pr.getPlan(
- String.valueOf(DOP),
+ String.valueOf(parallelism),
pagesPath,
edgesPath,
resultPath,
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/iterative/DeltaIterationNotDependingOnSolutionSetITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/DeltaIterationNotDependingOnSolutionSetITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/DeltaIterationNotDependingOnSolutionSetITCase.java
index 7f5015e..caa9d37 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/DeltaIterationNotDependingOnSolutionSetITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/DeltaIterationNotDependingOnSolutionSetITCase.java
@@ -40,7 +40,7 @@ public class DeltaIterationNotDependingOnSolutionSetITCase extends JavaProgramTe
protected void testProgram() throws Exception {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(1);
+ env.setParallelism(1);
DataSet<Tuple2<Long, Long>> input = env.generateSequence(0, 9).map(new Duplicator<Long>());
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/iterative/DeltaPageRankITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/DeltaPageRankITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/DeltaPageRankITCase.java
index cf59a3f..bf459c6 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/DeltaPageRankITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/DeltaPageRankITCase.java
@@ -42,7 +42,7 @@ public class DeltaPageRankITCase extends RecordAPITestBase {
@Override
protected Plan getTestJob() {
- String[] params = { String.valueOf(DOP) , verticesPath, edgesPath, resultPath, "3" };
+ String[] params = { String.valueOf(parallelism) , verticesPath, edgesPath, resultPath, "3" };
WorksetConnectedComponents cc = new WorksetConnectedComponents();
return cc.getPlan(params);
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/iterative/DependencyConnectedComponentsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/DependencyConnectedComponentsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/DependencyConnectedComponentsITCase.java
index 15079ec..0635fe5 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/DependencyConnectedComponentsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/DependencyConnectedComponentsITCase.java
@@ -43,7 +43,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
public class DependencyConnectedComponentsITCase extends JavaProgramTestBase {
private static final int MAX_ITERATIONS = 20;
- private static final int DOP = 1;
+ private static final int parallelism = 1;
protected static List<Tuple2<Long, Long>> verticesInput = new ArrayList<Tuple2<Long, Long>>();
protected static List<Tuple2<Long, Long>> edgesInput = new ArrayList<Tuple2<Long, Long>>();
@@ -51,7 +51,7 @@ public class DependencyConnectedComponentsITCase extends JavaProgramTestBase {
private String expectedResult;
public DependencyConnectedComponentsITCase(){
- setTaskManagerNumSlots(DOP);
+ setTaskManagerNumSlots(parallelism);
}
@@ -113,7 +113,7 @@ public class DependencyConnectedComponentsITCase extends JavaProgramTestBase {
public static String runProgram(String resultPath) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(DOP);
+ env.setParallelism(parallelism);
DataSet<Tuple2<Long, Long>> initialSolutionSet = env.fromCollection(verticesInput);
DataSet<Tuple2<Long, Long>> edges = env.fromCollection(edgesInput);
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTerminationTail.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTerminationTail.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTerminationTail.java
index 7b6cf11..0915a42 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTerminationTail.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTerminationTail.java
@@ -47,7 +47,7 @@ public class IterationTerminationWithTerminationTail extends RecordAPITestBase {
protected String resultPath;
public IterationTerminationWithTerminationTail(){
- setTaskManagerNumSlots(DOP);
+ setTaskManagerNumSlots(parallelism);
}
@Override
@@ -63,7 +63,7 @@ public class IterationTerminationWithTerminationTail extends RecordAPITestBase {
@Override
protected Plan getTestJob() {
- return getTestPlanPlan(DOP, dataPath, resultPath);
+ return getTestPlanPlan(parallelism, dataPath, resultPath);
}
private static Plan getTestPlanPlan(int numSubTasks, String input, String output) {
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTwoTails.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTwoTails.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTwoTails.java
index f05b1c2..3ce021b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTwoTails.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTwoTails.java
@@ -47,7 +47,7 @@ public class IterationTerminationWithTwoTails extends RecordAPITestBase {
protected String resultPath;
public IterationTerminationWithTwoTails(){
- setTaskManagerNumSlots(DOP);
+ setTaskManagerNumSlots(parallelism);
}
@Override
@@ -63,7 +63,7 @@ public class IterationTerminationWithTwoTails extends RecordAPITestBase {
@Override
protected Plan getTestJob() {
- return getTestPlanPlan(DOP, dataPath, resultPath);
+ return getTestPlanPlan(parallelism, dataPath, resultPath);
}
private static Plan getTestPlanPlan(int numSubTasks, String input, String output) {
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithAllReducerITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithAllReducerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithAllReducerITCase.java
index 3116a9d..cb16c15 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithAllReducerITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithAllReducerITCase.java
@@ -61,7 +61,7 @@ public class IterationWithAllReducerITCase extends RecordAPITestBase {
@Override
protected Plan getTestJob() {
- Plan plan = getTestPlanPlan(DOP, dataPath, resultPath);
+ Plan plan = getTestPlanPlan(parallelism, dataPath, resultPath);
return plan;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithChainingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithChainingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithChainingITCase.java
index f626b75..c11c9ea 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithChainingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithChainingITCase.java
@@ -52,7 +52,7 @@ public class IterationWithChainingITCase extends RecordAPITestBase {
public IterationWithChainingITCase(Configuration config) {
super(config);
- setTaskManagerNumSlots(DOP);
+ setTaskManagerNumSlots(parallelism);
}
@Override
@@ -74,7 +74,7 @@ public class IterationWithChainingITCase extends RecordAPITestBase {
@Parameters
public static Collection<Object[]> getConfigurations() {
Configuration config1 = new Configuration();
- config1.setInteger("ChainedMapperITCase#NoSubtasks", DOP);
+ config1.setInteger("ChainedMapperITCase#NoSubtasks", parallelism);
return toParameterList(config1);
}
@@ -103,7 +103,7 @@ public class IterationWithChainingITCase extends RecordAPITestBase {
static Plan getTestPlan(int numSubTasks, String input, String output) {
FileDataSource initialInput = new FileDataSource(new PointInFormat(), input, "Input");
- initialInput.setDegreeOfParallelism(1);
+ initialInput.setParallelism(1);
BulkIteration iteration = new BulkIteration("Loop");
iteration.setInput(initialInput);
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/iterative/IterativeKMeansITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterativeKMeansITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterativeKMeansITCase.java
index 4cd72fd..ac3659a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterativeKMeansITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterativeKMeansITCase.java
@@ -35,7 +35,7 @@ public class IterativeKMeansITCase extends RecordAPITestBase {
protected String resultPath;
public IterativeKMeansITCase(){
- setTaskManagerNumSlots(DOP);
+ setTaskManagerNumSlots(parallelism);
}
@Override
@@ -48,7 +48,7 @@ public class IterativeKMeansITCase extends RecordAPITestBase {
@Override
protected Plan getTestJob() {
KMeansBroadcast kmi = new KMeansBroadcast();
- return kmi.getPlan(String.valueOf(DOP), dataPath, clusterPath, resultPath, "20");
+ return kmi.getPlan(String.valueOf(parallelism), dataPath, clusterPath, resultPath, "20");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/iterative/KMeansITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/KMeansITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/KMeansITCase.java
index 2491028..fcf43df 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/KMeansITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/KMeansITCase.java
@@ -35,7 +35,7 @@ public class KMeansITCase extends RecordAPITestBase {
protected String resultPath;
public KMeansITCase(){
- setTaskManagerNumSlots(DOP);
+ setTaskManagerNumSlots(parallelism);
}
@Override
@@ -48,7 +48,7 @@ public class KMeansITCase extends RecordAPITestBase {
@Override
protected Plan getTestJob() {
KMeansBroadcast kmi = new KMeansBroadcast();
- return kmi.getPlan(String.valueOf(DOP), dataPath, clusterPath, resultPath, "20");
+ return kmi.getPlan(String.valueOf(parallelism), dataPath, clusterPath, resultPath, "20");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/iterative/LineRankITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/LineRankITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/LineRankITCase.java
index 450b360..fa13656 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/LineRankITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/LineRankITCase.java
@@ -59,7 +59,7 @@
//
// public LineRankITCase(Configuration config) {
// super(config);
-// setTaskManagerNumSlots(DOP);
+// setTaskManagerNumSlots(parallelism);
// }
//
// @Override
@@ -85,7 +85,7 @@
// @Parameters
// public static Collection<Object[]> getConfigurations() {
// Configuration config1 = new Configuration();
-// config1.setInteger("NumSubtasks", DOP);
+// config1.setInteger("NumSubtasks", parallelism);
// config1.setInteger("NumIterations", 5);
// return toParameterList(config1);
// }
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/iterative/PageRankITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/PageRankITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/PageRankITCase.java
index 8e42dd7..946d89b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/PageRankITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/PageRankITCase.java
@@ -43,7 +43,7 @@ public class PageRankITCase extends RecordAPITestBase {
protected Plan getTestJob() {
SimplePageRank pr = new SimplePageRank();
Plan plan = pr.getPlan(
- String.valueOf(DOP),
+ String.valueOf(parallelism),
pagesPath,
edgesPath,
resultPath,
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
index 5aa6b42..44544d3 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
@@ -51,7 +51,7 @@ import org.junit.runners.Parameterized;
public class AggregatorsITCase extends MultipleProgramsTestBase {
private static final int MAX_ITERATIONS = 20;
- private static final int DOP = 2;
+ private static final int parallelism = 2;
private static final String NEGATIVE_ELEMENTS_AGGR = "count.negative.elements";
public AggregatorsITCase(TestExecutionMode mode){
@@ -81,7 +81,7 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(DOP);
+ env.setParallelism(parallelism);
DataSet<Integer> initialSolutionSet = CollectionDataSets.getIntegerDataSet(env);
IterativeDataSet<Integer> iteration = initialSolutionSet.iterate(MAX_ITERATIONS);
@@ -110,7 +110,7 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(DOP);
+ env.setParallelism(parallelism);
DataSet<Integer> initialSolutionSet = CollectionDataSets.getIntegerDataSet(env);
IterativeDataSet<Integer> iteration = initialSolutionSet.iterate(MAX_ITERATIONS);
@@ -139,7 +139,7 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(DOP);
+ env.setParallelism(parallelism);
DataSet<Integer> initialSolutionSet = CollectionDataSets.getIntegerDataSet(env);
IterativeDataSet<Integer> iteration = initialSolutionSet.iterate(MAX_ITERATIONS);
@@ -168,7 +168,7 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(DOP);
+ env.setParallelism(parallelism);
DataSet<Tuple2<Integer, Integer>> initialSolutionSet = CollectionDataSets.getIntegerDataSet(env).map(new TupleMakerMap());
@@ -202,7 +202,7 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(DOP);
+ env.setParallelism(parallelism);
DataSet<Tuple2<Integer, Integer>> initialSolutionSet = CollectionDataSets.getIntegerDataSet(env).map(new TupleMakerMap());
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableAggregatorITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableAggregatorITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableAggregatorITCase.java
index faaa541..8bf50de 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableAggregatorITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableAggregatorITCase.java
@@ -41,7 +41,7 @@ import org.apache.flink.api.java.operators.IterativeDataSet;
public class ConnectedComponentsWithParametrizableAggregatorITCase extends JavaProgramTestBase {
private static final int MAX_ITERATIONS = 5;
- private static final int DOP = 1;
+ private static final int parallelism = 1;
protected static List<Tuple2<Long, Long>> verticesInput = new ArrayList<Tuple2<Long, Long>>();
protected static List<Tuple2<Long, Long>> edgesInput = new ArrayList<Tuple2<Long, Long>>();
@@ -118,7 +118,7 @@ public class ConnectedComponentsWithParametrizableAggregatorITCase extends JavaP
public static String runProgram(String resultPath) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(DOP);
+ env.setParallelism(parallelism);
DataSet<Tuple2<Long, Long>> initialSolutionSet = env.fromCollection(verticesInput);
DataSet<Tuple2<Long, Long>> edges = env.fromCollection(edgesInput);
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableConvergenceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableConvergenceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableConvergenceITCase.java
index 4d890e9..e616a2b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableConvergenceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableConvergenceITCase.java
@@ -44,7 +44,7 @@ import org.apache.flink.api.java.operators.IterativeDataSet;
public class ConnectedComponentsWithParametrizableConvergenceITCase extends JavaProgramTestBase {
private static final int MAX_ITERATIONS = 10;
- private static final int DOP = 1;
+ private static final int parallelism = 1;
protected static List<Tuple2<Long, Long>> verticesInput = new ArrayList<Tuple2<Long, Long>>();
protected static List<Tuple2<Long, Long>> edgesInput = new ArrayList<Tuple2<Long, Long>>();
@@ -111,7 +111,7 @@ public class ConnectedComponentsWithParametrizableConvergenceITCase extends Java
public static String runProgram(String resultPath) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(DOP);
+ env.setParallelism(parallelism);
DataSet<Tuple2<Long, Long>> initialSolutionSet = env.fromCollection(verticesInput);
DataSet<Tuple2<Long, Long>> edges = env.fromCollection(edgesInput);
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
index 0e568b6..7cec122 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
@@ -89,9 +89,9 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
private static final long MEM_PER_CONSUMER = 3;
- private static final int DOP = 4;
+ private static final int parallelism = 4;
- private static final double MEM_FRAC_PER_CONSUMER = (double)MEM_PER_CONSUMER/TASK_MANAGER_MEMORY_SIZE*DOP;
+ private static final double MEM_FRAC_PER_CONSUMER = (double)MEM_PER_CONSUMER/TASK_MANAGER_MEMORY_SIZE*parallelism;
protected String verticesPath;
@@ -101,7 +101,7 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
public ConnectedComponentsNepheleITCase(Configuration config) {
super(config);
- setTaskManagerNumSlots(DOP);
+ setTaskManagerNumSlots(parallelism);
}
@Parameters
@@ -135,14 +135,14 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
int type = config.getInteger("testcase", 0);
switch (type) {
case 1:
- return createJobGraphUnifiedTails(verticesPath, edgesPath, resultPath, DOP, maxIterations);
+ return createJobGraphUnifiedTails(verticesPath, edgesPath, resultPath, parallelism, maxIterations);
case 2:
- return createJobGraphSeparateTails(verticesPath, edgesPath, resultPath, DOP, maxIterations);
+ return createJobGraphSeparateTails(verticesPath, edgesPath, resultPath, parallelism, maxIterations);
case 3:
- return createJobGraphIntermediateWorksetUpdateAndSolutionSetTail(verticesPath, edgesPath, resultPath, DOP,
+ return createJobGraphIntermediateWorksetUpdateAndSolutionSetTail(verticesPath, edgesPath, resultPath, parallelism,
maxIterations);
case 4:
- return createJobGraphSolutionSetUpdateAndWorksetTail(verticesPath, edgesPath, resultPath, DOP,
+ return createJobGraphSolutionSetUpdateAndWorksetTail(verticesPath, edgesPath, resultPath, parallelism,
maxIterations);
default:
throw new RuntimeException("Broken test configuration");
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/DanglingPageRankNepheleITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/DanglingPageRankNepheleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/DanglingPageRankNepheleITCase.java
index f22bc84..516309c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/DanglingPageRankNepheleITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/DanglingPageRankNepheleITCase.java
@@ -40,7 +40,7 @@ public class DanglingPageRankNepheleITCase extends RecordAPITestBase {
protected String resultPath;
public DanglingPageRankNepheleITCase(){
- setTaskManagerNumSlots(DOP);
+ setTaskManagerNumSlots(parallelism);
}
@@ -54,7 +54,7 @@ public class DanglingPageRankNepheleITCase extends RecordAPITestBase {
@Override
protected JobGraph getJobGraph() throws Exception {
String[] parameters = new String[] {
- Integer.valueOf(DOP).toString(),
+ Integer.valueOf(parallelism).toString(),
pagesWithRankPath,
edgesPath,
resultPath,
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/DanglingPageRankWithCombinerNepheleITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/DanglingPageRankWithCombinerNepheleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/DanglingPageRankWithCombinerNepheleITCase.java
index c4ffd02..ba22ce5 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/DanglingPageRankWithCombinerNepheleITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/DanglingPageRankWithCombinerNepheleITCase.java
@@ -29,7 +29,7 @@ public class DanglingPageRankWithCombinerNepheleITCase extends RecordAPITestBase
protected String resultPath;
public DanglingPageRankWithCombinerNepheleITCase(){
- setTaskManagerNumSlots(DOP);
+ setTaskManagerNumSlots(parallelism);
}
@Override
@@ -42,7 +42,7 @@ public class DanglingPageRankWithCombinerNepheleITCase extends RecordAPITestBase
@Override
protected JobGraph getJobGraph() throws Exception {
String[] parameters = new String[] {
- Integer.valueOf(DOP).toString(),
+ Integer.valueOf(parallelism).toString(),
pagesWithRankPath,
edgesPath,
resultPath,
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/IterationWithChainingNepheleITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/IterationWithChainingNepheleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/IterationWithChainingNepheleITCase.java
index 2a8e84d..69ff083 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/IterationWithChainingNepheleITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/IterationWithChainingNepheleITCase.java
@@ -79,7 +79,7 @@ public class IterationWithChainingNepheleITCase extends RecordAPITestBase {
public IterationWithChainingNepheleITCase(Configuration config) {
super(config);
- setTaskManagerNumSlots(DOP);
+ setTaskManagerNumSlots(parallelism);
}
@Override
@@ -99,7 +99,7 @@ public class IterationWithChainingNepheleITCase extends RecordAPITestBase {
@Parameterized.Parameters
public static Collection<Object[]> getConfigurations() {
Configuration config = new Configuration();
- config.setInteger("ChainedMapperNepheleITCase#NoSubtasks", DOP);
+ config.setInteger("ChainedMapperNepheleITCase#NoSubtasks", parallelism);
config.setInteger("ChainedMapperNepheleITCase#MaxIterations", 2);
return toParameterList(config);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/JobGraphUtils.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/JobGraphUtils.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/JobGraphUtils.java
index 58969c9..153a85e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/JobGraphUtils.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/JobGraphUtils.java
@@ -40,20 +40,20 @@ public class JobGraphUtils {
private JobGraphUtils() {}
public static <T extends FileInputFormat<?>> InputFormatVertex createInput(T stub, String path, String name, JobGraph graph,
- int degreeOfParallelism)
+ int parallelism)
{
stub.setFilePath(path);
- return createInput(new UserCodeObjectWrapper<T>(stub), name, graph, degreeOfParallelism);
+ return createInput(new UserCodeObjectWrapper<T>(stub), name, graph, parallelism);
}
private static <T extends InputFormat<?,?>> InputFormatVertex createInput(UserCodeWrapper<T> stub, String name, JobGraph graph,
- int degreeOfParallelism)
+ int parallelism)
{
InputFormatVertex inputVertex = new InputFormatVertex(name);
graph.addVertex(inputVertex);
inputVertex.setInvokableClass(DataSourceTask.class);
- inputVertex.setParallelism(degreeOfParallelism);
+ inputVertex.setParallelism(parallelism);
TaskConfig inputConfig = new TaskConfig(inputVertex.getConfiguration());
inputConfig.setStubWrapper(stub);
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java
index 0705500..8801dd6 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java
@@ -96,7 +96,7 @@ public class CustomCompensatableDanglingPageRank {
public static JobGraph getJobGraph(String[] args) throws Exception {
- int degreeOfParallelism = 2;
+ int parallelism = 2;
String pageWithRankInputPath = ""; //"file://" + PlayConstants.PLAY_DIR + "test-inputs/danglingpagerank/pageWithRank";
String adjacencyListInputPath = ""; //"file://" + PlayConstants.PLAY_DIR +
// "test-inputs/danglingpagerank/adjacencylists";
@@ -114,7 +114,7 @@ public class CustomCompensatableDanglingPageRank {
double messageLoss = 0.75;
if (args.length >= 14) {
- degreeOfParallelism = Integer.parseInt(args[0]);
+ parallelism = Integer.parseInt(args[0]);
pageWithRankInputPath = args[1];
adjacencyListInputPath = args[2];
outputPath = args[3];
@@ -138,7 +138,7 @@ public class CustomCompensatableDanglingPageRank {
// page rank input
InputFormatVertex pageWithRankInput = JobGraphUtils.createInput(new CustomImprovedDanglingPageRankInputFormat(),
- pageWithRankInputPath, "DanglingPageWithRankInput", jobGraph, degreeOfParallelism);
+ pageWithRankInputPath, "DanglingPageWithRankInput", jobGraph, parallelism);
TaskConfig pageWithRankInputConfig = new TaskConfig(pageWithRankInput.getConfiguration());
pageWithRankInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
pageWithRankInputConfig.setOutputComparator(vertexWithRankAndDanglingComparator, 0);
@@ -147,7 +147,7 @@ public class CustomCompensatableDanglingPageRank {
// edges as adjacency list
InputFormatVertex adjacencyListInput = JobGraphUtils.createInput(new CustomImprovedAdjacencyListInputFormat(),
- adjacencyListInputPath, "AdjancencyListInput", jobGraph, degreeOfParallelism);
+ adjacencyListInputPath, "AdjancencyListInput", jobGraph, parallelism);
TaskConfig adjacencyListInputConfig = new TaskConfig(adjacencyListInput.getConfiguration());
adjacencyListInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
adjacencyListInputConfig.setOutputSerializer(vertexWithAdjacencyListSerializer);
@@ -155,7 +155,7 @@ public class CustomCompensatableDanglingPageRank {
// --------------- the head ---------------------
AbstractJobVertex head = JobGraphUtils.createTask(IterationHeadPactTask.class, "IterationHead", jobGraph,
- degreeOfParallelism);
+ parallelism);
TaskConfig headConfig = new TaskConfig(head.getConfiguration());
headConfig.setIterationId(ITERATION_ID);
@@ -200,7 +200,7 @@ public class CustomCompensatableDanglingPageRank {
// --------------- the join ---------------------
AbstractJobVertex intermediate = JobGraphUtils.createTask(IterationIntermediatePactTask.class,
- "IterationIntermediate", jobGraph, degreeOfParallelism);
+ "IterationIntermediate", jobGraph, parallelism);
TaskConfig intermediateConfig = new TaskConfig(intermediate.getConfiguration());
intermediateConfig.setIterationId(ITERATION_ID);
// intermediateConfig.setDriver(RepeatableHashjoinMatchDriverWithCachedBuildside.class);
@@ -228,7 +228,7 @@ public class CustomCompensatableDanglingPageRank {
// ---------------- the tail (co group) --------------------
AbstractJobVertex tail = JobGraphUtils.createTask(IterationTailPactTask.class, "IterationTail", jobGraph,
- degreeOfParallelism);
+ parallelism);
TaskConfig tailConfig = new TaskConfig(tail.getConfiguration());
tailConfig.setIterationId(ITERATION_ID);
tailConfig.setIsWorksetUpdate();
@@ -264,7 +264,7 @@ public class CustomCompensatableDanglingPageRank {
// --------------- the output ---------------------
- OutputFormatVertex output = JobGraphUtils.createFileOutput(jobGraph, "FinalOutput", degreeOfParallelism);
+ OutputFormatVertex output = JobGraphUtils.createFileOutput(jobGraph, "FinalOutput", parallelism);
TaskConfig outputConfig = new TaskConfig(output.getConfiguration());
outputConfig.addInputToGroup(0);
outputConfig.setInputSerializer(vertexWithRankAndDanglingSerializer, 0);
@@ -273,7 +273,7 @@ public class CustomCompensatableDanglingPageRank {
// --------------- the auxiliaries ---------------------
- AbstractJobVertex sync = JobGraphUtils.createSync(jobGraph, degreeOfParallelism);
+ AbstractJobVertex sync = JobGraphUtils.createSync(jobGraph, parallelism);
TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
syncConfig.setNumberOfIterations(numIterations);
syncConfig.addIterationAggregator(CustomCompensatableDotProductCoGroup.AGGREGATOR_NAME, new PageRankStatsAggregator());
@@ -292,7 +292,7 @@ public class CustomCompensatableDanglingPageRank {
JobGraphUtils.connect(head, tail, DistributionPattern.POINTWISE);
JobGraphUtils.connect(intermediate, tail, DistributionPattern.ALL_TO_ALL);
tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
- tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(1, degreeOfParallelism);
+ tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(1, parallelism);
JobGraphUtils.connect(head, output, DistributionPattern.POINTWISE);
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java
index 7bc300f..6f19c03 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java
@@ -97,7 +97,7 @@ public class CustomCompensatableDanglingPageRankWithCombiner {
public static JobGraph getJobGraph(String[] args) throws Exception {
- int degreeOfParallelism = 2;
+ int parallelism = 2;
String pageWithRankInputPath = ""; //"file://" + PlayConstants.PLAY_DIR + "test-inputs/danglingpagerank/pageWithRank";
String adjacencyListInputPath = ""; //"file://" + PlayConstants.PLAY_DIR +
// "test-inputs/danglingpagerank/adjacencylists";
@@ -114,7 +114,7 @@ public class CustomCompensatableDanglingPageRankWithCombiner {
double messageLoss = 0.75;
if (args.length >= 14) {
- degreeOfParallelism = Integer.parseInt(args[0]);
+ parallelism = Integer.parseInt(args[0]);
pageWithRankInputPath = args[1];
adjacencyListInputPath = args[2];
outputPath = args[3];
@@ -138,7 +138,7 @@ public class CustomCompensatableDanglingPageRankWithCombiner {
// page rank input
InputFormatVertex pageWithRankInput = JobGraphUtils.createInput(new CustomImprovedDanglingPageRankInputFormat(),
- pageWithRankInputPath, "DanglingPageWithRankInput", jobGraph, degreeOfParallelism);
+ pageWithRankInputPath, "DanglingPageWithRankInput", jobGraph, parallelism);
TaskConfig pageWithRankInputConfig = new TaskConfig(pageWithRankInput.getConfiguration());
pageWithRankInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
pageWithRankInputConfig.setOutputComparator(vertexWithRankAndDanglingComparator, 0);
@@ -147,7 +147,7 @@ public class CustomCompensatableDanglingPageRankWithCombiner {
// edges as adjacency list
InputFormatVertex adjacencyListInput = JobGraphUtils.createInput(new CustomImprovedAdjacencyListInputFormat(),
- adjacencyListInputPath, "AdjancencyListInput", jobGraph, degreeOfParallelism);
+ adjacencyListInputPath, "AdjancencyListInput", jobGraph, parallelism);
TaskConfig adjacencyListInputConfig = new TaskConfig(adjacencyListInput.getConfiguration());
adjacencyListInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
adjacencyListInputConfig.setOutputSerializer(vertexWithAdjacencyListSerializer);
@@ -155,7 +155,7 @@ public class CustomCompensatableDanglingPageRankWithCombiner {
// --------------- the head ---------------------
AbstractJobVertex head = JobGraphUtils.createTask(IterationHeadPactTask.class, "IterationHead", jobGraph,
- degreeOfParallelism);
+ parallelism);
TaskConfig headConfig = new TaskConfig(head.getConfiguration());
headConfig.setIterationId(ITERATION_ID);
@@ -200,7 +200,7 @@ public class CustomCompensatableDanglingPageRankWithCombiner {
// --------------- the join ---------------------
AbstractJobVertex intermediate = JobGraphUtils.createTask(IterationIntermediatePactTask.class,
- "IterationIntermediate", jobGraph, degreeOfParallelism);
+ "IterationIntermediate", jobGraph, parallelism);
TaskConfig intermediateConfig = new TaskConfig(intermediate.getConfiguration());
intermediateConfig.setIterationId(ITERATION_ID);
// intermediateConfig.setDriver(RepeatableHashjoinMatchDriverWithCachedBuildside.class);
@@ -241,7 +241,7 @@ public class CustomCompensatableDanglingPageRankWithCombiner {
// ---------------- the tail (co group) --------------------
AbstractJobVertex tail = JobGraphUtils.createTask(IterationTailPactTask.class, "IterationTail", jobGraph,
- degreeOfParallelism);
+ parallelism);
TaskConfig tailConfig = new TaskConfig(tail.getConfiguration());
tailConfig.setIterationId(ITERATION_ID);
tailConfig.setIsWorksetUpdate();
@@ -278,7 +278,7 @@ public class CustomCompensatableDanglingPageRankWithCombiner {
// --------------- the output ---------------------
- OutputFormatVertex output = JobGraphUtils.createFileOutput(jobGraph, "FinalOutput", degreeOfParallelism);
+ OutputFormatVertex output = JobGraphUtils.createFileOutput(jobGraph, "FinalOutput", parallelism);
TaskConfig outputConfig = new TaskConfig(output.getConfiguration());
outputConfig.addInputToGroup(0);
outputConfig.setInputSerializer(vertexWithRankAndDanglingSerializer, 0);
@@ -287,7 +287,7 @@ public class CustomCompensatableDanglingPageRankWithCombiner {
// --------------- the auxiliaries ---------------------
- AbstractJobVertex sync = JobGraphUtils.createSync(jobGraph, degreeOfParallelism);
+ AbstractJobVertex sync = JobGraphUtils.createSync(jobGraph, parallelism);
TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
syncConfig.setNumberOfIterations(numIterations);
syncConfig.addIterationAggregator(CustomCompensatableDotProductCoGroup.AGGREGATOR_NAME, new PageRankStatsAggregator());
@@ -306,7 +306,7 @@ public class CustomCompensatableDanglingPageRankWithCombiner {
JobGraphUtils.connect(head, tail, DistributionPattern.POINTWISE);
JobGraphUtils.connect(intermediate, tail, DistributionPattern.ALL_TO_ALL);
tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
- tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(1, degreeOfParallelism);
+ tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(1, parallelism);
JobGraphUtils.connect(head, output, DistributionPattern.POINTWISE);
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java
index f06f723..8216ccb 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java
@@ -77,7 +77,7 @@ public class CompensatableDanglingPageRank {
public static JobGraph getJobGraph(String[] args) throws Exception {
- int degreeOfParallelism = 2;
+ int parallelism = 2;
String pageWithRankInputPath = ""; // "file://" + PlayConstants.PLAY_DIR + "test-inputs/danglingpagerank/pageWithRank";
String adjacencyListInputPath = ""; // "file://" + PlayConstants.PLAY_DIR +
// "test-inputs/danglingpagerank/adjacencylists";
@@ -95,7 +95,7 @@ public class CompensatableDanglingPageRank {
double messageLoss = 0.75;
if (args.length >= 15) {
- degreeOfParallelism = Integer.parseInt(args[0]);
+ parallelism = Integer.parseInt(args[0]);
pageWithRankInputPath = args[1];
adjacencyListInputPath = args[2];
outputPath = args[3];
@@ -119,7 +119,7 @@ public class CompensatableDanglingPageRank {
// page rank input
InputFormatVertex pageWithRankInput = JobGraphUtils.createInput(new ImprovedDanglingPageRankInputFormat(),
- pageWithRankInputPath, "DanglingPageWithRankInput", jobGraph, degreeOfParallelism);
+ pageWithRankInputPath, "DanglingPageWithRankInput", jobGraph, parallelism);
TaskConfig pageWithRankInputConfig = new TaskConfig(pageWithRankInput.getConfiguration());
pageWithRankInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
pageWithRankInputConfig.setOutputComparator(fieldZeroComparator, 0);
@@ -128,14 +128,14 @@ public class CompensatableDanglingPageRank {
// edges as adjacency list
InputFormatVertex adjacencyListInput = JobGraphUtils.createInput(new ImprovedAdjacencyListInputFormat(),
- adjacencyListInputPath, "AdjancencyListInput", jobGraph, degreeOfParallelism);
+ adjacencyListInputPath, "AdjancencyListInput", jobGraph, parallelism);
TaskConfig adjacencyListInputConfig = new TaskConfig(adjacencyListInput.getConfiguration());
adjacencyListInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
adjacencyListInputConfig.setOutputSerializer(recSerializer);
adjacencyListInputConfig.setOutputComparator(fieldZeroComparator, 0);
// --------------- the head ---------------------
- AbstractJobVertex head = JobGraphUtils.createTask(IterationHeadPactTask.class, "IterationHead", jobGraph, degreeOfParallelism);
+ AbstractJobVertex head = JobGraphUtils.createTask(IterationHeadPactTask.class, "IterationHead", jobGraph, parallelism);
TaskConfig headConfig = new TaskConfig(head.getConfiguration());
headConfig.setIterationId(ITERATION_ID);
@@ -179,7 +179,7 @@ public class CompensatableDanglingPageRank {
// --------------- the join ---------------------
- AbstractJobVertex intermediate = JobGraphUtils.createTask(IterationIntermediatePactTask.class, "IterationIntermediate", jobGraph, degreeOfParallelism);
+ AbstractJobVertex intermediate = JobGraphUtils.createTask(IterationIntermediatePactTask.class, "IterationIntermediate", jobGraph, parallelism);
TaskConfig intermediateConfig = new TaskConfig(intermediate.getConfiguration());
intermediateConfig.setIterationId(ITERATION_ID);
// intermediateConfig.setDriver(RepeatableHashjoinMatchDriverWithCachedBuildside.class);
@@ -207,7 +207,7 @@ public class CompensatableDanglingPageRank {
// ---------------- the tail (co group) --------------------
AbstractJobVertex tail = JobGraphUtils.createTask(IterationTailPactTask.class, "IterationTail", jobGraph,
- degreeOfParallelism);
+ parallelism);
TaskConfig tailConfig = new TaskConfig(tail.getConfiguration());
tailConfig.setIterationId(ITERATION_ID);
tailConfig.setIsWorksetUpdate();
@@ -244,7 +244,7 @@ public class CompensatableDanglingPageRank {
// --------------- the output ---------------------
- OutputFormatVertex output = JobGraphUtils.createFileOutput(jobGraph, "FinalOutput", degreeOfParallelism);
+ OutputFormatVertex output = JobGraphUtils.createFileOutput(jobGraph, "FinalOutput", parallelism);
TaskConfig outputConfig = new TaskConfig(output.getConfiguration());
outputConfig.addInputToGroup(0);
outputConfig.setInputSerializer(recSerializer, 0);
@@ -253,7 +253,7 @@ public class CompensatableDanglingPageRank {
// --------------- the auxiliaries ---------------------
- AbstractJobVertex sync = JobGraphUtils.createSync(jobGraph, degreeOfParallelism);
+ AbstractJobVertex sync = JobGraphUtils.createSync(jobGraph, parallelism);
TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
syncConfig.setNumberOfIterations(numIterations);
syncConfig.addIterationAggregator(CompensatableDotProductCoGroup.AGGREGATOR_NAME, new PageRankStatsAggregator());
@@ -272,7 +272,7 @@ public class CompensatableDanglingPageRank {
JobGraphUtils.connect(head, tail, DistributionPattern.POINTWISE);
JobGraphUtils.connect(intermediate, tail, DistributionPattern.ALL_TO_ALL);
tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
- tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(1, degreeOfParallelism);
+ tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(1, parallelism);
JobGraphUtils.connect(head, output, DistributionPattern.POINTWISE);
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSinkITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSinkITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSinkITCase.java
index dbf4798..5dc3867 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSinkITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSinkITCase.java
@@ -61,7 +61,7 @@ public class DataSinkITCase extends MultipleProgramsTestBase {
}
@Test
- public void testIntSortingDOP1() throws Exception {
+ public void testIntSortingParallelism1() throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -76,7 +76,7 @@ public class DataSinkITCase extends MultipleProgramsTestBase {
}
@Test
- public void testStringSortingDOP1() throws Exception {
+ public void testStringSortingParallelism1() throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -98,7 +98,7 @@ public class DataSinkITCase extends MultipleProgramsTestBase {
}
@Test
- public void testTupleSortingSingleAscDOP1() throws Exception {
+ public void testTupleSortingSingleAscParallelism1() throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -134,7 +134,7 @@ public class DataSinkITCase extends MultipleProgramsTestBase {
}
@Test
- public void testTupleSortingSingleDescDOP1() throws Exception {
+ public void testTupleSortingSingleDescParallelism1() throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -169,7 +169,7 @@ public class DataSinkITCase extends MultipleProgramsTestBase {
}
@Test
- public void testTupleSortingDualDOP1() throws Exception {
+ public void testTupleSortingDualParallelism1() throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -204,7 +204,7 @@ public class DataSinkITCase extends MultipleProgramsTestBase {
}
@Test
- public void testTupleSortingNestedDOP1() throws Exception {
+ public void testTupleSortingNestedParallelism1() throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -230,7 +230,7 @@ public class DataSinkITCase extends MultipleProgramsTestBase {
}
@Test
- public void testTupleSortingNestedDOP1_2() throws Exception {
+ public void testTupleSortingNestedParallelism1_2() throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -256,7 +256,7 @@ public class DataSinkITCase extends MultipleProgramsTestBase {
}
@Test
- public void testPojoSortingSingleDOP1() throws Exception {
+ public void testPojoSortingSingleParallelism1() throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -279,7 +279,7 @@ public class DataSinkITCase extends MultipleProgramsTestBase {
}
@Test
- public void testPojoSortingDualDOP1() throws Exception {
+ public void testPojoSortingDualParallelism1() throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -306,7 +306,7 @@ public class DataSinkITCase extends MultipleProgramsTestBase {
}
@Test
- public void testPojoSortingNestedDOP1() throws Exception {
+ public void testPojoSortingNestedParallelism1() throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -333,7 +333,7 @@ public class DataSinkITCase extends MultipleProgramsTestBase {
}
@Test
- public void testSortingDOP4() throws Exception {
+ public void testSortingParallelism4() throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupCombineITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupCombineITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupCombineITCase.java
index 494f354..2a97c60 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupCombineITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupCombineITCase.java
@@ -321,12 +321,12 @@ public class GroupCombineITCase extends MultipleProgramsTestBase {
}
@Test
- // check if dop 1 results in the same data like a shuffle
+ // check if parallelism of 1 results in the same data like a shuffle
public void testCheckPartitionShuffleDOP1() throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(1);
+ env.setParallelism(1);
// data
DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
index e5f91b4..c5067f9 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
@@ -138,7 +138,7 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(1);
+ env.setParallelism(1);
DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
@@ -346,7 +346,7 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
org.junit.Assume.assumeTrue(mode != TestExecutionMode.COLLECTION);
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(2); // important because it determines how often the combiner is called
+ env.setParallelism(2); // important because it determines how often the combiner is called
DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
DataSet<Tuple2<Integer, String>> reduceDs = ds.
@@ -394,7 +394,7 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
* check correctness of groupReduce with descending group sort
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(1);
+ env.setParallelism(1);
DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
@@ -456,7 +456,7 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(1);
+ env.setParallelism(1);
DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
@@ -590,7 +590,7 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
* check correctness of groupReduce with descending group sort
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(1);
+ env.setParallelism(1);
DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
@@ -613,7 +613,7 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
* Test int-based definition on group sort, for (full) nested Tuple
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(1);
+ env.setParallelism(1);
DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds = CollectionDataSets.getGroupSortedNestedTupleDataSet(env);
DataSet<String> reduceDs = ds.groupBy("f1").sortGroup(0, Order.DESCENDING).reduceGroup(new NestedTupleReducer());
@@ -631,7 +631,7 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
* Test int-based definition on group sort, for (partial) nested Tuple ASC
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(1);
+ env.setParallelism(1);
DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds = CollectionDataSets.getGroupSortedNestedTupleDataSet(env);
// f0.f0 is first integer
@@ -653,7 +653,7 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
* Test string-based definition on group sort, for (partial) nested Tuple DESC
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(1);
+ env.setParallelism(1);
DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds = CollectionDataSets.getGroupSortedNestedTupleDataSet(env);
// f0.f0 is first integer
@@ -672,7 +672,7 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
* Test string-based definition on group sort, for two grouping keys
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(1);
+ env.setParallelism(1);
DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds = CollectionDataSets.getGroupSortedNestedTupleDataSet(env);
// f0.f0 is first integer
@@ -691,7 +691,7 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
* Test string-based definition on group sort, for two grouping keys with Pojos
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(1);
+ env.setParallelism(1);
DataSet<PojoContainingTupleAndWritable> ds = CollectionDataSets.getGroupSortedPojoContainingTupleAndWritable(env);
// f0.f0 is first integer
@@ -711,7 +711,7 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(1);
+ env.setParallelism(1);
DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
DataSet<Tuple3<Integer, Long, String>> reduceDs = ds
@@ -830,7 +830,7 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(1);
+ env.setParallelism(1);
DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
DataSet<Tuple2<Integer, String>> reduceDs = ds.
@@ -870,7 +870,7 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(1);
+ env.setParallelism(1);
DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs = ds
@@ -915,7 +915,7 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
* Test grouping with pojo containing multiple pojos (was a bug)
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(1);
+ env.setParallelism(1);
DataSet<CollectionDataSets.PojoWithMultiplePojos> ds = CollectionDataSets.getPojoWithMultiplePojos(env);
@@ -947,7 +947,7 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
* Test Java collections within pojos ( == test kryo)
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(1);
+ env.setParallelism(1);
DataSet<CollectionDataSets.PojoWithCollection> ds = CollectionDataSets.getPojoWithCollection(env);
// f0.f0 is first integer
@@ -982,7 +982,7 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
* Group by generic type
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(1);
+ env.setParallelism(1);
DataSet<CollectionDataSets.PojoWithCollection> ds = CollectionDataSets.getPojoWithCollection(env);
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
index 375baee..0080fb1 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
@@ -527,7 +527,7 @@ public class JoinITCase extends MultipleProgramsTestBase {
ds1.join(ds2).where("nestedPojo.longNumber", "number", "str").equalTo("f6","f0","f1");
joinDs.writeAsCsv(resultPath);
- env.setDegreeOfParallelism(1);
+ env.setParallelism(1);
env.execute();
expected = "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One,10000)\n" +
@@ -548,7 +548,7 @@ public class JoinITCase extends MultipleProgramsTestBase {
ds1.join(ds2).where("nestedPojo.longNumber", "number","nestedTupleWithCustom.f0").equalTo("f6","f0","f2");
joinDs.writeAsCsv(resultPath);
- env.setDegreeOfParallelism(1);
+ env.setParallelism(1);
env.execute();
expected = "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One,10000)\n" +
@@ -569,7 +569,7 @@ public class JoinITCase extends MultipleProgramsTestBase {
ds1.join(ds2).where("nestedTupleWithCustom.f0","nestedTupleWithCustom.f1.myInt","nestedTupleWithCustom.f1.myLong").equalTo("f2","f3","f4");
joinDs.writeAsCsv(resultPath);
- env.setDegreeOfParallelism(1);
+ env.setParallelism(1);
env.execute();
expected = "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One,10000)\n" +
@@ -590,7 +590,7 @@ public class JoinITCase extends MultipleProgramsTestBase {
ds1.join(ds2).where(0).equalTo("f0.f0", "f0.f1"); // key is now Tuple2<Integer, Integer>
joinDs.writeAsCsv(resultPath);
- env.setDegreeOfParallelism(1);
+ env.setParallelism(1);
env.execute();
expected = "((1,1),one),((1,1),one)\n" +
@@ -612,7 +612,7 @@ public class JoinITCase extends MultipleProgramsTestBase {
ds1.join(ds2).where("f0.f0").equalTo("f0.f0"); // key is now Integer from Tuple2<Integer, Integer>
joinDs.writeAsCsv(resultPath);
- env.setDegreeOfParallelism(1);
+ env.setParallelism(1);
env.execute();
expected = "((1,1),one),((1,1),one)\n" +
@@ -633,7 +633,7 @@ public class JoinITCase extends MultipleProgramsTestBase {
ds1.join(ds2).where("*").equalTo("*");
joinDs.writeAsCsv(resultPath);
- env.setDegreeOfParallelism(1);
+ env.setParallelism(1);
env.execute();
expected = "1 First (10,100,1000,One) 10000,(10000,10,100,1000,One,1,First)\n"+
@@ -655,7 +655,7 @@ public class JoinITCase extends MultipleProgramsTestBase {
ds2.join(ds2).where("f1.f0").equalTo("f0.f0");
joinDs.writeAsCsv(resultPath);
- env.setDegreeOfParallelism(1);
+ env.setParallelism(1);
env.execute();
expected = "((1,1,Hi),(1,1,Hi)),((1,1,Hi),(1,1,Hi))\n" +
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java
index e1603ca..3637680 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java
@@ -148,8 +148,8 @@ public class PartitionITCase extends MultipleProgramsTestBase {
env.execute();
StringBuilder result = new StringBuilder();
- int numPerPartition = 2220 / env.getDegreeOfParallelism() / 10;
- for (int i = 0; i < env.getDegreeOfParallelism(); i++) {
+ int numPerPartition = 2220 / env.getParallelism() / 10;
+ for (int i = 0; i < env.getParallelism(); i++) {
result.append('(').append(i).append(',').append(numPerPartition).append(")\n");
}
@@ -190,13 +190,13 @@ public class PartitionITCase extends MultipleProgramsTestBase {
}
@Test
- public void testHashPartitionByKeyFieldAndDifferentDOP() throws Exception {
+ public void testHashPartitionByKeyFieldAndDifferentParallelism() throws Exception {
/*
- * Test hash partition by key field and different DOP
+ * Test hash partition by key field and different parallelism
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(3);
+ env.setParallelism(3);
DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
DataSet<Long> uniqLongs = ds
@@ -221,7 +221,7 @@ public class PartitionITCase extends MultipleProgramsTestBase {
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(3);
+ env.setParallelism(3);
DataSet<POJO> ds = CollectionDataSets.getDuplicatePojoDataSet(env);
DataSet<Long> uniqLongs = ds
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SortPartitionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SortPartitionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SortPartitionITCase.java
index 4bba558..d961f3a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SortPartitionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SortPartitionITCase.java
@@ -72,7 +72,7 @@ public class SortPartitionITCase extends MultipleProgramsTestBase {
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(4);
+ env.setParallelism(4);
DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
ds
@@ -94,7 +94,7 @@ public class SortPartitionITCase extends MultipleProgramsTestBase {
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(2);
+ env.setParallelism(2);
DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
ds
@@ -117,7 +117,7 @@ public class SortPartitionITCase extends MultipleProgramsTestBase {
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(4);
+ env.setParallelism(4);
DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
ds
@@ -139,7 +139,7 @@ public class SortPartitionITCase extends MultipleProgramsTestBase {
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(2);
+ env.setParallelism(2);
DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
ds
@@ -162,7 +162,7 @@ public class SortPartitionITCase extends MultipleProgramsTestBase {
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(3);
+ env.setParallelism(3);
DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds = CollectionDataSets.getGroupSortedNestedTupleDataSet(env);
ds
@@ -185,7 +185,7 @@ public class SortPartitionITCase extends MultipleProgramsTestBase {
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(3);
+ env.setParallelism(3);
DataSet<POJO> ds = CollectionDataSets.getMixedPojoDataSet(env);
ds
@@ -202,17 +202,17 @@ public class SortPartitionITCase extends MultipleProgramsTestBase {
}
@Test
- public void testSortPartitionDOPChange() throws Exception {
+ public void testSortPartitionParallelismChange() throws Exception {
/*
- * Test sort partition with DOP change
+ * Test sort partition with parallelism change
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(3);
+ env.setParallelism(3);
DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
ds
- .sortPartition(1, Order.DESCENDING).setParallelism(3) // change DOP
+ .sortPartition(1, Order.DESCENDING).setParallelism(3) // change parallelism
.mapPartition(new OrderCheckMapper<Tuple3<Integer, Long, String>>(new Tuple3Checker()))
.distinct()
.writeAsText(resultPath);
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
index 8ddd7bc..aeab77b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
@@ -78,7 +78,7 @@ public class AutoParallelismITCase {
ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
"localhost", cluster.getJobManagerRPCPort());
- env.setDegreeOfParallelism(ExecutionConfig.PARALLELISM_AUTO_MAX);
+ env.setParallelism(ExecutionConfig.PARALLELISM_AUTO_MAX);
DataSet<Integer> result = env
.createInput(new ParallelismDependentInputFormat())
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/misc/CustomPartitioningITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/CustomPartitioningITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/CustomPartitioningITCase.java
index c308007..39a08d2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/CustomPartitioningITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/CustomPartitioningITCase.java
@@ -34,7 +34,7 @@ public class CustomPartitioningITCase extends JavaProgramTestBase {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
if (!isCollectionExecution()) {
- Assert.assertTrue(env.getDegreeOfParallelism() > 1);
+ Assert.assertTrue(env.getParallelism() > 1);
}
env.generateSequence(1, 1000)
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/misc/NullValuesITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/NullValuesITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/NullValuesITCase.java
index 2087b63..e0ebadd 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/NullValuesITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/NullValuesITCase.java
@@ -48,7 +48,7 @@ public class NullValuesITCase {
ExecutionEnvironment env =
ExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getJobManagerRPCPort());
- env.setDegreeOfParallelism(1);
+ env.setParallelism(1);
DataSet<String> data = env.fromElements("hallo")
.map(new MapFunction<String, String>() {
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java
index 220611d..be05186 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java
@@ -146,23 +146,23 @@ public class CoGroupITCase extends RecordAPITestBase {
FileDataSource input_left = new FileDataSource(new CoGroupTestInFormat(), leftInPath);
DelimitedInputFormat.configureDelimitedFormat(input_left)
.recordDelimiter('\n');
- input_left.setDegreeOfParallelism(config.getInteger("CoGroupTest#NoSubtasks", 1));
+ input_left.setParallelism(config.getInteger("CoGroupTest#NoSubtasks", 1));
FileDataSource input_right = new FileDataSource(new CoGroupTestInFormat(), rightInPath);
DelimitedInputFormat.configureDelimitedFormat(input_right)
.recordDelimiter('\n');
- input_right.setDegreeOfParallelism(config.getInteger("CoGroupTest#NoSubtasks", 1));
+ input_right.setParallelism(config.getInteger("CoGroupTest#NoSubtasks", 1));
CoGroupOperator testCoGrouper = CoGroupOperator.builder(new TestCoGrouper(), StringValue.class, 0, 0)
.build();
- testCoGrouper.setDegreeOfParallelism(config.getInteger("CoGroupTest#NoSubtasks", 1));
+ testCoGrouper.setParallelism(config.getInteger("CoGroupTest#NoSubtasks", 1));
testCoGrouper.getParameters().setString(Optimizer.HINT_LOCAL_STRATEGY,
config.getString("CoGroupTest#LocalStrategy", ""));
testCoGrouper.getParameters().setString(Optimizer.HINT_SHIP_STRATEGY,
config.getString("CoGroupTest#ShipStrategy", ""));
FileDataSink output = new FileDataSink(new CoGroupOutFormat(), resultPath);
- output.setDegreeOfParallelism(1);
+ output.setParallelism(1);
output.setInput(testCoGrouper);
testCoGrouper.setFirstInput(input_left);
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java
index f6b4127..3fde5a9 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java
@@ -122,16 +122,16 @@ public class CrossITCase extends RecordAPITestBase {
new ContractITCaseInputFormat(), leftInPath);
DelimitedInputFormat.configureDelimitedFormat(input_left)
.recordDelimiter('\n');
- input_left.setDegreeOfParallelism(config.getInteger("CrossTest#NoSubtasks", 1));
+ input_left.setParallelism(config.getInteger("CrossTest#NoSubtasks", 1));
FileDataSource input_right = new FileDataSource(
new ContractITCaseInputFormat(), rightInPath);
DelimitedInputFormat.configureDelimitedFormat(input_right)
.recordDelimiter('\n');
- input_right.setDegreeOfParallelism(config.getInteger("CrossTest#NoSubtasks", 1));
+ input_right.setParallelism(config.getInteger("CrossTest#NoSubtasks", 1));
CrossOperator testCross = CrossOperator.builder(new TestCross()).build();
- testCross.setDegreeOfParallelism(config.getInteger("CrossTest#NoSubtasks", 1));
+ testCross.setParallelism(config.getInteger("CrossTest#NoSubtasks", 1));
testCross.getParameters().setString(Optimizer.HINT_LOCAL_STRATEGY,
config.getString("CrossTest#LocalStrategy", ""));
if (config.getString("CrossTest#ShipStrategy", "").equals("BROADCAST_FIRST")) {
@@ -151,7 +151,7 @@ public class CrossITCase extends RecordAPITestBase {
FileDataSink output = new FileDataSink(
new ContractITCaseOutputFormat(), resultPath);
- output.setDegreeOfParallelism(1);
+ output.setParallelism(1);
output.setInput(testCross);
testCross.setFirstInput(input_left);