You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2015/03/23 09:09:25 UTC
[5/9] flink git commit: [FLINK-1679] use a consistent name for
parallelism
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
index 916aa27..2df08a0 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
@@ -79,7 +79,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+ env.setParallelism(DEFAULT_PARALLELISM);
DataSet<Long> source = env.generateSequence(1, 10000);
@@ -120,7 +120,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
public void testBranchingWithMultipleDataSinks2() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+ env.setParallelism(DEFAULT_PARALLELISM);
DataSet<Long> source = env.generateSequence(1, 10000);
@@ -184,7 +184,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
public void testBranchingSourceMultipleTimes() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+ env.setParallelism(DEFAULT_PARALLELISM);
DataSet<Tuple2<Long, Long>> source = env.generateSequence(1, 10000000)
.map(new Duplicator<Long>());
@@ -267,7 +267,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
public void testBranchingWithMultipleDataSinks() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+ env.setParallelism(DEFAULT_PARALLELISM);
DataSet<Tuple2<Long, Long>> sourceA = env.generateSequence(1, 10000000)
.map(new Duplicator<Long>());
@@ -815,7 +815,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
public void testIterationWithStaticInput() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(100);
+ env.setParallelism(100);
DataSet<Long> source = env.generateSequence(1, 1000000);
@@ -842,7 +842,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
@Test
public void testBranchingBroadcastVariable() {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(100);
+ env.setParallelism(100);
DataSet<String> input1 = env.readTextFile(IN_FILE).name("source1");
DataSet<String> input2 = env.readTextFile(IN_FILE).name("source2");
@@ -914,7 +914,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
@Test
public void testMultipleIterations() {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(100);
+ env.setParallelism(100);
DataSet<String> input = env.readTextFile(IN_FILE).name("source1");
@@ -943,7 +943,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
@Test
public void testMultipleIterationsWithClosueBCVars() {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(100);
+ env.setParallelism(100);
DataSet<String> input = env.readTextFile(IN_FILE).name("source1");
@@ -970,7 +970,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
public void testBranchesOnlyInBCVariables1() {
try{
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(100);
+ env.setParallelism(100);
DataSet<Long> input = env.generateSequence(1, 10);
DataSet<Long> bc_input = env.generateSequence(1, 10);
@@ -993,7 +993,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
public void testBranchesOnlyInBCVariables2() {
try{
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(100);
+ env.setParallelism(100);
DataSet<Tuple2<Long, Long>> input = env.generateSequence(1, 10).map(new Duplicator<Long>()).name("proper input");
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java
index 3e7da6c..47efeb1 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java
@@ -203,7 +203,7 @@ public class CachedMatchStrategyCompilerTest extends CompilerTestBase {
private Plan getTestPlanRightStatic(String strategy) {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+ env.setParallelism(DEFAULT_PARALLELISM);
DataSet<Tuple3<Long, Long, Long>> bigInput = env.readCsvFile("file://bigFile").types(Long.class, Long.class, Long.class).name("bigFile");
@@ -231,7 +231,7 @@ public class CachedMatchStrategyCompilerTest extends CompilerTestBase {
private Plan getTestPlanLeftStatic(String strategy) {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+ env.setParallelism(DEFAULT_PARALLELISM);
@SuppressWarnings("unchecked")
DataSet<Tuple3<Long, Long, Long>> bigInput = env.fromElements(new Tuple3<Long, Long, Long>(1L, 2L, 3L),
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java
index 565d992..4eed236 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java
@@ -71,10 +71,10 @@ public abstract class CompilerTestBase implements java.io.Serializable {
public void setup() {
this.dataStats = new DataStatistics();
this.withStatsCompiler = new Optimizer(this.dataStats, new DefaultCostEstimator());
- this.withStatsCompiler.setDefaultDegreeOfParallelism(DEFAULT_PARALLELISM);
+ this.withStatsCompiler.setDefaultParallelism(DEFAULT_PARALLELISM);
this.noStatsCompiler = new Optimizer(null, new DefaultCostEstimator());
- this.noStatsCompiler.setDefaultDegreeOfParallelism(DEFAULT_PARALLELISM);
+ this.noStatsCompiler.setDefaultParallelism(DEFAULT_PARALLELISM);
}
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/test/java/org/apache/flink/optimizer/DOPChangeTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DOPChangeTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DOPChangeTest.java
deleted file mode 100644
index b17e777..0000000
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DOPChangeTest.java
+++ /dev/null
@@ -1,347 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.optimizer;
-
-import org.junit.Assert;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.JoinOperator;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.DualInputPlanNode;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.PlanNode;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.optimizer.util.DummyInputFormat;
-import org.apache.flink.optimizer.util.DummyMatchStub;
-import org.apache.flink.optimizer.util.DummyOutputFormat;
-import org.apache.flink.optimizer.util.IdentityMap;
-import org.apache.flink.optimizer.util.IdentityReduce;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.util.Visitor;
-import org.junit.Test;
-
-/**
- * Tests in this class:
- * <ul>
- * <li>Tests that check the correct handling of the properties and strategies in the case where the degree of
- * parallelism between tasks is increased or decreased.
- * </ul>
- */
-@SuppressWarnings({"serial", "deprecation"})
-public class DOPChangeTest extends CompilerTestBase {
-
- /**
- * Simple Job: Map -> Reduce -> Map -> Reduce. All functions preserve all fields (hence all properties).
- *
- * Increases DOP between 1st reduce and 2nd map, so the hash partitioning from 1st reduce is not reusable.
- * Expected to re-establish partitioning between reduce and map, via hash, because random is a full network
- * transit as well.
- */
- @Test
- public void checkPropertyHandlingWithIncreasingGlobalParallelism1() {
- final int degOfPar = DEFAULT_PARALLELISM;
-
- // construct the plan
- FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source");
- source.setDegreeOfParallelism(degOfPar);
-
- MapOperator map1 = MapOperator.builder(new IdentityMap()).name("Map1").build();
- map1.setDegreeOfParallelism(degOfPar);
- map1.setInput(source);
-
- ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 1").build();
- reduce1.setDegreeOfParallelism(degOfPar);
- reduce1.setInput(map1);
-
- MapOperator map2 = MapOperator.builder(new IdentityMap()).name("Map2").build();
- map2.setDegreeOfParallelism(degOfPar * 2);
- map2.setInput(reduce1);
-
- ReduceOperator reduce2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 2").build();
- reduce2.setDegreeOfParallelism(degOfPar * 2);
- reduce2.setInput(map2);
-
- FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink");
- sink.setDegreeOfParallelism(degOfPar * 2);
- sink.setInput(reduce2);
-
- Plan plan = new Plan(sink, "Test Increasing Degree Of Parallelism");
-
- // submit the plan to the compiler
- OptimizedPlan oPlan = compileNoStats(plan);
-
- // check the optimized Plan
- // when reducer 1 distributes its data across the instances of map2, it needs to employ a local hash method,
- // because map2 has twice as many instances and key/value pairs with the same key need to be processed by the same
- // mapper respectively reducer
- SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
- SingleInputPlanNode red2Node = (SingleInputPlanNode) sinkNode.getPredecessor();
- SingleInputPlanNode map2Node = (SingleInputPlanNode) red2Node.getPredecessor();
-
- ShipStrategyType mapIn = map2Node.getInput().getShipStrategy();
- ShipStrategyType redIn = red2Node.getInput().getShipStrategy();
-
- Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.PARTITION_HASH, mapIn);
- Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, redIn);
- }
-
- /**
- * Simple Job: Map -> Reduce -> Map -> Reduce. All functions preserve all fields (hence all properties).
- *
- * Increases DOP between 2nd map and 2nd reduce, so the hash partitioning from 1st reduce is not reusable.
- * Expected to re-establish partitioning between map and reduce (hash).
- */
- @Test
- public void checkPropertyHandlingWithIncreasingGlobalParallelism2() {
- final int degOfPar = DEFAULT_PARALLELISM;
-
- // construct the plan
- FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source");
- source.setDegreeOfParallelism(degOfPar);
-
- MapOperator map1 = MapOperator.builder(new IdentityMap()).name("Map1").build();
- map1.setDegreeOfParallelism(degOfPar);
- map1.setInput(source);
-
- ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 1").build();
- reduce1.setDegreeOfParallelism(degOfPar);
- reduce1.setInput(map1);
-
- MapOperator map2 = MapOperator.builder(new IdentityMap()).name("Map2").build();
- map2.setDegreeOfParallelism(degOfPar);
- map2.setInput(reduce1);
-
- ReduceOperator reduce2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 2").build();
- reduce2.setDegreeOfParallelism(degOfPar * 2);
- reduce2.setInput(map2);
-
- FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink");
- sink.setDegreeOfParallelism(degOfPar * 2);
- sink.setInput(reduce2);
-
- Plan plan = new Plan(sink, "Test Increasing Degree Of Parallelism");
-
- // submit the plan to the compiler
- OptimizedPlan oPlan = compileNoStats(plan);
-
- // check the optimized Plan
- // when reducer 1 distributes its data across the instances of map2, it needs to employ a local hash method,
- // because map2 has twice as many instances and key/value pairs with the same key need to be processed by the same
- // mapper respectively reducer
- SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
- SingleInputPlanNode red2Node = (SingleInputPlanNode) sinkNode.getPredecessor();
- SingleInputPlanNode map2Node = (SingleInputPlanNode) red2Node.getPredecessor();
-
- ShipStrategyType mapIn = map2Node.getInput().getShipStrategy();
- ShipStrategyType reduceIn = red2Node.getInput().getShipStrategy();
-
- Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, mapIn);
- Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.PARTITION_HASH, reduceIn);
- }
-
- /**
- * Simple Job: Map -> Reduce -> Map -> Reduce. All functions preserve all fields (hence all properties).
- *
- * Increases DOP between 1st reduce and 2nd map, such that more tasks are on one instance.
- * Expected to re-establish partitioning between map and reduce via a local hash.
- */
- @Test
- public void checkPropertyHandlingWithIncreasingLocalParallelism() {
- final int degOfPar = 2 * DEFAULT_PARALLELISM;
-
- // construct the plan
- FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source");
- source.setDegreeOfParallelism(degOfPar);
-
- MapOperator map1 = MapOperator.builder(new IdentityMap()).name("Map1").build();
- map1.setDegreeOfParallelism(degOfPar);
- map1.setInput(source);
-
- ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 1").build();
- reduce1.setDegreeOfParallelism(degOfPar);
- reduce1.setInput(map1);
-
- MapOperator map2 = MapOperator.builder(new IdentityMap()).name("Map2").build();
- map2.setDegreeOfParallelism(degOfPar * 2);
- map2.setInput(reduce1);
-
- ReduceOperator reduce2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 2").build();
- reduce2.setDegreeOfParallelism(degOfPar * 2);
- reduce2.setInput(map2);
-
- FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink");
- sink.setDegreeOfParallelism(degOfPar * 2);
- sink.setInput(reduce2);
-
- Plan plan = new Plan(sink, "Test Increasing Degree Of Parallelism");
-
- // submit the plan to the compiler
- OptimizedPlan oPlan = compileNoStats(plan);
-
- // check the optimized Plan
- // when reducer 1 distributes its data across the instances of map2, it needs to employ a local hash method,
- // because map2 has twice as many instances and key/value pairs with the same key need to be processed by the same
- // mapper respectively reducer
- SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
- SingleInputPlanNode red2Node = (SingleInputPlanNode) sinkNode.getPredecessor();
- SingleInputPlanNode map2Node = (SingleInputPlanNode) red2Node.getPredecessor();
-
- ShipStrategyType mapIn = map2Node.getInput().getShipStrategy();
- ShipStrategyType reduceIn = red2Node.getInput().getShipStrategy();
-
- Assert.assertTrue("Invalid ship strategy for an operator.",
- (ShipStrategyType.PARTITION_RANDOM == mapIn && ShipStrategyType.PARTITION_HASH == reduceIn) ||
- (ShipStrategyType.PARTITION_HASH == mapIn && ShipStrategyType.FORWARD == reduceIn));
- }
-
-
-
- @Test
- public void checkPropertyHandlingWithDecreasingDegreeOfParallelism() {
- final int degOfPar = DEFAULT_PARALLELISM;
-
- // construct the plan
- FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source");
- source.setDegreeOfParallelism(degOfPar * 2);
-
- MapOperator map1 = MapOperator.builder(new IdentityMap()).name("Map1").build();
- map1.setDegreeOfParallelism(degOfPar * 2);
- map1.setInput(source);
-
- ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 1").build();
- reduce1.setDegreeOfParallelism(degOfPar * 2);
- reduce1.setInput(map1);
-
- MapOperator map2 = MapOperator.builder(new IdentityMap()).name("Map2").build();
- map2.setDegreeOfParallelism(degOfPar);
- map2.setInput(reduce1);
-
- ReduceOperator reduce2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 2").build();
- reduce2.setDegreeOfParallelism(degOfPar);
- reduce2.setInput(map2);
-
- FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink");
- sink.setDegreeOfParallelism(degOfPar);
- sink.setInput(reduce2);
-
- Plan plan = new Plan(sink, "Test Increasing Degree Of Parallelism");
-
- // submit the plan to the compiler
- OptimizedPlan oPlan = compileNoStats(plan);
-
- // check the optimized Plan
- // when reducer 1 distributes its data across the instances of map2, it needs to employ a local hash method,
- // because map2 has twice as many instances and key/value pairs with the same key need to be processed by the same
- // mapper respectively reducer
- SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
- SingleInputPlanNode red2Node = (SingleInputPlanNode) sinkNode.getPredecessor();
- SingleInputPlanNode map2Node = (SingleInputPlanNode) red2Node.getPredecessor();
-
- Assert.assertTrue("The no sorting local strategy.",
- LocalStrategy.SORT == red2Node.getInput().getLocalStrategy() ||
- LocalStrategy.SORT == map2Node.getInput().getLocalStrategy());
-
- Assert.assertTrue("The no partitioning ship strategy.",
- ShipStrategyType.PARTITION_HASH == red2Node.getInput().getShipStrategy() ||
- ShipStrategyType.PARTITION_HASH == map2Node.getInput().getShipStrategy());
- }
-
- /**
- * Checks that re-partitioning happens when the inputs of a two-input contract have different DOPs.
- *
- * Test Plan:
- * <pre>
- *
- * (source) -> reduce -\
- * Match -> (sink)
- * (source) -> reduce -/
- *
- * </pre>
- *
- */
- @Test
- public void checkPropertyHandlingWithTwoInputs() {
- // construct the plan
-
- FileDataSource sourceA = new FileDataSource(new DummyInputFormat(), IN_FILE);
- FileDataSource sourceB = new FileDataSource(new DummyInputFormat(), IN_FILE);
-
- ReduceOperator redA = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0)
- .input(sourceA)
- .build();
- ReduceOperator redB = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0)
- .input(sourceB)
- .build();
-
- JoinOperator mat = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0)
- .input1(redA)
- .input2(redB)
- .build();
-
- FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, mat);
-
- sourceA.setDegreeOfParallelism(5);
- sourceB.setDegreeOfParallelism(7);
- redA.setDegreeOfParallelism(5);
- redB.setDegreeOfParallelism(7);
-
- mat.setDegreeOfParallelism(5);
-
- sink.setDegreeOfParallelism(5);
-
-
- // return the PACT plan
- Plan plan = new Plan(sink, "Partition on DoP Change");
-
- OptimizedPlan oPlan = compileNoStats(plan);
-
- JobGraphGenerator jobGen = new JobGraphGenerator();
-
- //Compile plan to verify that no error is thrown
- jobGen.compileJobGraph(oPlan);
-
- oPlan.accept(new Visitor<PlanNode>() {
-
- @Override
- public boolean preVisit(PlanNode visitable) {
- if (visitable instanceof DualInputPlanNode) {
- DualInputPlanNode node = (DualInputPlanNode) visitable;
- Channel c1 = node.getInput1();
- Channel c2 = node.getInput2();
-
- Assert.assertEquals("Incompatible shipping strategy chosen for match", ShipStrategyType.FORWARD, c1.getShipStrategy());
- Assert.assertEquals("Incompatible shipping strategy chosen for match", ShipStrategyType.PARTITION_HASH, c2.getShipStrategy());
- return false;
- }
- return true;
- }
-
- @Override
- public void postVisit(PlanNode visitable) {
- // DO NOTHING
- }
- });
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
index 34aa9f8..3b7eae7 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
@@ -42,7 +42,7 @@ public class DistinctCompilationTest extends CompilerTestBase implements java.io
public void testDistinctPlain() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(8);
+ env.setParallelism(8);
DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
.name("source").setParallelism(6);
@@ -77,7 +77,7 @@ public class DistinctCompilationTest extends CompilerTestBase implements java.io
assertEquals(new FieldList(0, 1), combineNode.getKeys(0));
assertEquals(new FieldList(0, 1), reduceNode.getInput().getLocalStrategyKeys());
- // check DOP
+ // check parallelism
assertEquals(6, sourceNode.getParallelism());
assertEquals(6, combineNode.getParallelism());
assertEquals(8, reduceNode.getParallelism());
@@ -94,7 +94,7 @@ public class DistinctCompilationTest extends CompilerTestBase implements java.io
public void testDistinctWithSelectorFunctionKey() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(8);
+ env.setParallelism(8);
DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
.name("source").setParallelism(6);
@@ -135,7 +135,7 @@ public class DistinctCompilationTest extends CompilerTestBase implements java.io
assertEquals(new FieldList(0), combineNode.getKeys(0));
assertEquals(new FieldList(0), reduceNode.getInput().getLocalStrategyKeys());
- // check DOP
+ // check parallelism
assertEquals(6, sourceNode.getParallelism());
assertEquals(6, keyExtractor.getParallelism());
assertEquals(6, combineNode.getParallelism());
@@ -155,7 +155,7 @@ public class DistinctCompilationTest extends CompilerTestBase implements java.io
public void testDistinctWithFieldPositionKeyCombinable() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(8);
+ env.setParallelism(8);
DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
.name("source").setParallelism(6);
@@ -191,7 +191,7 @@ public class DistinctCompilationTest extends CompilerTestBase implements java.io
assertEquals(new FieldList(1), combineNode.getKeys(0));
assertEquals(new FieldList(1), reduceNode.getInput().getLocalStrategyKeys());
- // check DOP
+ // check parallelism
assertEquals(6, sourceNode.getParallelism());
assertEquals(6, combineNode.getParallelism());
assertEquals(8, reduceNode.getParallelism());
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java
index ac4f820..810ec0e 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java
@@ -91,7 +91,7 @@ public class IterationsCompilerTest extends CompilerTestBase {
public void testTwoIterationsWithMapperInbetween() throws Exception {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(8);
+ env.setParallelism(8);
DataSet<Tuple2<Long, Long>> verticesWithInitialId = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
@@ -129,7 +129,7 @@ public class IterationsCompilerTest extends CompilerTestBase {
public void testTwoIterationsDirectlyChained() throws Exception {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(8);
+ env.setParallelism(8);
DataSet<Tuple2<Long, Long>> verticesWithInitialId = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
@@ -165,7 +165,7 @@ public class IterationsCompilerTest extends CompilerTestBase {
public void testTwoWorksetIterationsDirectlyChained() throws Exception {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(8);
+ env.setParallelism(8);
DataSet<Tuple2<Long, Long>> verticesWithInitialId = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
@@ -201,7 +201,7 @@ public class IterationsCompilerTest extends CompilerTestBase {
public void testIterationPushingWorkOut() throws Exception {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(8);
+ env.setParallelism(8);
DataSet<Tuple2<Long, Long>> input1 = env.readCsvFile("/some/file/path").types(Long.class).map(new DuplicateValue());
@@ -235,7 +235,7 @@ public class IterationsCompilerTest extends CompilerTestBase {
public void testWorksetIterationPipelineBreakerPlacement() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(8);
+ env.setParallelism(8);
// the workset (input two of the delta iteration) is the same as what is consumed be the successive join
DataSet<Tuple2<Long, Long>> initialWorkset = env.readCsvFile("/some/file/path").types(Long.class).map(new DuplicateValue());
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/test/java/org/apache/flink/optimizer/ParallelismChangeTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ParallelismChangeTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ParallelismChangeTest.java
new file mode 100644
index 0000000..a54136a
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ParallelismChangeTest.java
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.optimizer;
+
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.DualInputPlanNode;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.PlanNode;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.optimizer.util.DummyInputFormat;
+import org.apache.flink.optimizer.util.DummyMatchStub;
+import org.apache.flink.optimizer.util.DummyOutputFormat;
+import org.apache.flink.optimizer.util.IdentityMap;
+import org.apache.flink.optimizer.util.IdentityReduce;
+import org.junit.Assert;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.record.operators.FileDataSink;
+import org.apache.flink.api.java.record.operators.FileDataSource;
+import org.apache.flink.api.java.record.operators.JoinOperator;
+import org.apache.flink.api.java.record.operators.MapOperator;
+import org.apache.flink.api.java.record.operators.ReduceOperator;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.runtime.operators.util.LocalStrategy;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.util.Visitor;
+import org.junit.Test;
+
+/**
+ * Tests in this class:
+ * <ul>
+ * <li>Tests that check the correct handling of the properties and strategies in the case where the
+ * parallelism between tasks is increased or decreased.
+ * </ul>
+ */
+@SuppressWarnings({"serial", "deprecation"})
+public class ParallelismChangeTest extends CompilerTestBase {
+
+ /**
+ * Simple Job: Map -> Reduce -> Map -> Reduce. All functions preserve all fields (hence all properties).
+ *
+ * Increases parallelism between 1st reduce and 2nd map, so the hash partitioning from 1st reduce is not reusable.
+ * Expected to re-establish partitioning between reduce and map, via hash, because random is a full network
+ * transit as well.
+ */
+ @Test
+ public void checkPropertyHandlingWithIncreasingGlobalParallelism1() {
+ final int degOfPar = DEFAULT_PARALLELISM;
+
+ // construct the plan
+ FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source");
+ source.setParallelism(degOfPar);
+
+ MapOperator map1 = MapOperator.builder(new IdentityMap()).name("Map1").build();
+ map1.setParallelism(degOfPar);
+ map1.setInput(source);
+
+ ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 1").build();
+ reduce1.setParallelism(degOfPar);
+ reduce1.setInput(map1);
+
+ MapOperator map2 = MapOperator.builder(new IdentityMap()).name("Map2").build();
+ map2.setParallelism(degOfPar * 2);
+ map2.setInput(reduce1);
+
+ ReduceOperator reduce2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 2").build();
+ reduce2.setParallelism(degOfPar * 2);
+ reduce2.setInput(map2);
+
+ FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink");
+ sink.setParallelism(degOfPar * 2);
+ sink.setInput(reduce2);
+
+ Plan plan = new Plan(sink, "Test Increasing parallelism");
+
+ // submit the plan to the compiler
+ OptimizedPlan oPlan = compileNoStats(plan);
+
+ // check the optimized Plan
+ // when reducer 1 distributes its data across the instances of map2, it needs to employ a local hash method,
+ // because map2 has twice as many instances and key/value pairs with the same key need to be processed by the same
+ // mapper respectively reducer
+ SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+ SingleInputPlanNode red2Node = (SingleInputPlanNode) sinkNode.getPredecessor();
+ SingleInputPlanNode map2Node = (SingleInputPlanNode) red2Node.getPredecessor();
+
+ ShipStrategyType mapIn = map2Node.getInput().getShipStrategy();
+ ShipStrategyType redIn = red2Node.getInput().getShipStrategy();
+
+ Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.PARTITION_HASH, mapIn);
+ Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, redIn);
+ }
+
+ /**
+ * Simple Job: Map -> Reduce -> Map -> Reduce. All functions preserve all fields (hence all properties).
+ *
+ * Increases parallelism between 2nd map and 2nd reduce, so the hash partitioning from 1st reduce is not reusable.
+ * Expected to re-establish partitioning between map and reduce (hash).
+ */
+ @Test
+ public void checkPropertyHandlingWithIncreasingGlobalParallelism2() {
+ final int degOfPar = DEFAULT_PARALLELISM;
+
+ // construct the plan
+ FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source");
+ source.setParallelism(degOfPar);
+
+ MapOperator map1 = MapOperator.builder(new IdentityMap()).name("Map1").build();
+ map1.setParallelism(degOfPar);
+ map1.setInput(source);
+
+ ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 1").build();
+ reduce1.setParallelism(degOfPar);
+ reduce1.setInput(map1);
+
+ MapOperator map2 = MapOperator.builder(new IdentityMap()).name("Map2").build();
+ map2.setParallelism(degOfPar);
+ map2.setInput(reduce1);
+
+ ReduceOperator reduce2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 2").build();
+ reduce2.setParallelism(degOfPar * 2);
+ reduce2.setInput(map2);
+
+ FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink");
+ sink.setParallelism(degOfPar * 2);
+ sink.setInput(reduce2);
+
+ Plan plan = new Plan(sink, "Test Increasing parallelism");
+
+ // submit the plan to the compiler
+ OptimizedPlan oPlan = compileNoStats(plan);
+
+ // check the optimized Plan
+ // when reducer 1 distributes its data across the instances of map2, it needs to employ a local hash method,
+ // because map2 has twice as many instances and key/value pairs with the same key need to be processed by the same
+ // mapper respectively reducer
+ SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+ SingleInputPlanNode red2Node = (SingleInputPlanNode) sinkNode.getPredecessor();
+ SingleInputPlanNode map2Node = (SingleInputPlanNode) red2Node.getPredecessor();
+
+ ShipStrategyType mapIn = map2Node.getInput().getShipStrategy();
+ ShipStrategyType reduceIn = red2Node.getInput().getShipStrategy();
+
+ Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, mapIn);
+ Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.PARTITION_HASH, reduceIn);
+ }
+
+ /**
+ * Simple Job: Map -> Reduce -> Map -> Reduce. All functions preserve all fields (hence all properties).
+ *
+ * Increases parallelism between 1st reduce and 2nd map, such that more tasks are on one instance.
+ * Expected to re-establish partitioning between map and reduce via a local hash.
+ */
+ @Test
+ public void checkPropertyHandlingWithIncreasingLocalParallelism() {
+ final int degOfPar = 2 * DEFAULT_PARALLELISM;
+
+ // construct the plan
+ FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source");
+ source.setParallelism(degOfPar);
+
+ MapOperator map1 = MapOperator.builder(new IdentityMap()).name("Map1").build();
+ map1.setParallelism(degOfPar);
+ map1.setInput(source);
+
+ ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 1").build();
+ reduce1.setParallelism(degOfPar);
+ reduce1.setInput(map1);
+
+ MapOperator map2 = MapOperator.builder(new IdentityMap()).name("Map2").build();
+ map2.setParallelism(degOfPar * 2);
+ map2.setInput(reduce1);
+
+ ReduceOperator reduce2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 2").build();
+ reduce2.setParallelism(degOfPar * 2);
+ reduce2.setInput(map2);
+
+ FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink");
+ sink.setParallelism(degOfPar * 2);
+ sink.setInput(reduce2);
+
+ Plan plan = new Plan(sink, "Test Increasing parallelism");
+
+ // submit the plan to the compiler
+ OptimizedPlan oPlan = compileNoStats(plan);
+
+ // check the optimized Plan
+ // when reducer 1 distributes its data across the instances of map2, it needs to employ a local hash method,
+ // because map2 has twice as many instances and key/value pairs with the same key need to be processed by the same
+ // mapper respectively reducer
+ SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+ SingleInputPlanNode red2Node = (SingleInputPlanNode) sinkNode.getPredecessor();
+ SingleInputPlanNode map2Node = (SingleInputPlanNode) red2Node.getPredecessor();
+
+ ShipStrategyType mapIn = map2Node.getInput().getShipStrategy();
+ ShipStrategyType reduceIn = red2Node.getInput().getShipStrategy();
+
+ Assert.assertTrue("Invalid ship strategy for an operator.",
+ (ShipStrategyType.PARTITION_RANDOM == mapIn && ShipStrategyType.PARTITION_HASH == reduceIn) ||
+ (ShipStrategyType.PARTITION_HASH == mapIn && ShipStrategyType.FORWARD == reduceIn));
+ }
+
+
+
+ @Test
+ public void checkPropertyHandlingWithDecreasingParallelism() {
+ final int degOfPar = DEFAULT_PARALLELISM;
+
+ // construct the plan
+ FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source");
+ source.setParallelism(degOfPar * 2);
+
+ MapOperator map1 = MapOperator.builder(new IdentityMap()).name("Map1").build();
+ map1.setParallelism(degOfPar * 2);
+ map1.setInput(source);
+
+ ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 1").build();
+ reduce1.setParallelism(degOfPar * 2);
+ reduce1.setInput(map1);
+
+ MapOperator map2 = MapOperator.builder(new IdentityMap()).name("Map2").build();
+ map2.setParallelism(degOfPar);
+ map2.setInput(reduce1);
+
+ ReduceOperator reduce2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 2").build();
+ reduce2.setParallelism(degOfPar);
+ reduce2.setInput(map2);
+
+ FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink");
+ sink.setParallelism(degOfPar);
+ sink.setInput(reduce2);
+
+ Plan plan = new Plan(sink, "Test Increasing parallelism");
+
+ // submit the plan to the compiler
+ OptimizedPlan oPlan = compileNoStats(plan);
+
+ // check the optimized Plan
+ // when reducer 1 distributes its data across the instances of map2, it needs to employ a local hash method,
+ // because map2 has twice as many instances and key/value pairs with the same key need to be processed by the same
+ // mapper respectively reducer
+ SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+ SingleInputPlanNode red2Node = (SingleInputPlanNode) sinkNode.getPredecessor();
+ SingleInputPlanNode map2Node = (SingleInputPlanNode) red2Node.getPredecessor();
+
+ Assert.assertTrue("The no sorting local strategy.",
+ LocalStrategy.SORT == red2Node.getInput().getLocalStrategy() ||
+ LocalStrategy.SORT == map2Node.getInput().getLocalStrategy());
+
+ Assert.assertTrue("The no partitioning ship strategy.",
+ ShipStrategyType.PARTITION_HASH == red2Node.getInput().getShipStrategy() ||
+ ShipStrategyType.PARTITION_HASH == map2Node.getInput().getShipStrategy());
+ }
+
+ /**
+ * Checks that re-partitioning happens when the inputs of a two-input contract have different parallelisms.
+ *
+ * Test Plan:
+ * <pre>
+ *
+ * (source) -> reduce -\
+ * Match -> (sink)
+ * (source) -> reduce -/
+ *
+ * </pre>
+ *
+ */
+ @Test
+ public void checkPropertyHandlingWithTwoInputs() {
+ // construct the plan
+
+ FileDataSource sourceA = new FileDataSource(new DummyInputFormat(), IN_FILE);
+ FileDataSource sourceB = new FileDataSource(new DummyInputFormat(), IN_FILE);
+
+ ReduceOperator redA = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0)
+ .input(sourceA)
+ .build();
+ ReduceOperator redB = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0)
+ .input(sourceB)
+ .build();
+
+ JoinOperator mat = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0)
+ .input1(redA)
+ .input2(redB)
+ .build();
+
+ FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, mat);
+
+ sourceA.setParallelism(5);
+ sourceB.setParallelism(7);
+ redA.setParallelism(5);
+ redB.setParallelism(7);
+
+ mat.setParallelism(5);
+
+ sink.setParallelism(5);
+
+
+ // return the PACT plan
+ Plan plan = new Plan(sink, "Partition on DoP Change");
+
+ OptimizedPlan oPlan = compileNoStats(plan);
+
+ JobGraphGenerator jobGen = new JobGraphGenerator();
+
+ //Compile plan to verify that no error is thrown
+ jobGen.compileJobGraph(oPlan);
+
+ oPlan.accept(new Visitor<PlanNode>() {
+
+ @Override
+ public boolean preVisit(PlanNode visitable) {
+ if (visitable instanceof DualInputPlanNode) {
+ DualInputPlanNode node = (DualInputPlanNode) visitable;
+ Channel c1 = node.getInput1();
+ Channel c2 = node.getInput2();
+
+ Assert.assertEquals("Incompatible shipping strategy chosen for match", ShipStrategyType.FORWARD, c1.getShipStrategy());
+ Assert.assertEquals("Incompatible shipping strategy chosen for match", ShipStrategyType.PARTITION_HASH, c2.getShipStrategy());
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public void postVisit(PlanNode visitable) {
+ // DO NOTHING
+ }
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java
index 86f01b0..31f71d1 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java
@@ -41,7 +41,7 @@ public class PipelineBreakerTest extends CompilerTestBase {
public void testPipelineBreakerWithBroadcastVariable() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(64);
+ env.setParallelism(64);
DataSet<Long> source = env.generateSequence(1, 10).map(new IdentityMapper<Long>());
@@ -69,7 +69,7 @@ public class PipelineBreakerTest extends CompilerTestBase {
public void testPipelineBreakerBroadcastedAllReduce() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(64);
+ env.setParallelism(64);
DataSet<Long> sourceWithMapper = env.generateSequence(1, 10).map(new IdentityMapper<Long>());
@@ -103,7 +103,7 @@ public class PipelineBreakerTest extends CompilerTestBase {
public void testPipelineBreakerBroadcastedPartialSolution() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(64);
+ env.setParallelism(64);
DataSet<Long> initialSource = env.generateSequence(1, 10);
@@ -144,7 +144,7 @@ public class PipelineBreakerTest extends CompilerTestBase {
try {
{
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(64);
+ env.setParallelism(64);
DataSet<Long> initialSource = env.generateSequence(1, 10);
@@ -166,7 +166,7 @@ public class PipelineBreakerTest extends CompilerTestBase {
{
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(64);
+ env.setParallelism(64);
DataSet<Long> initialSource = env.generateSequence(1, 10);
@@ -189,7 +189,7 @@ public class PipelineBreakerTest extends CompilerTestBase {
{
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(64);
+ env.setParallelism(64);
DataSet<Long> initialSource = env.generateSequence(1, 10);
@@ -212,7 +212,7 @@ public class PipelineBreakerTest extends CompilerTestBase {
{
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(64);
+ env.setParallelism(64);
DataSet<Long> initialSource = env.generateSequence(1, 10);
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java
index 7be2b16..3cf081f 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java
@@ -55,7 +55,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
public void checkSinglePartitionedSource1() {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
- env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+ env.setParallelism(DEFAULT_PARALLELISM);
DataSource<Tuple2<Long, String>> data =
env.readCsvFile("/some/path").types(Long.class, String.class);
@@ -88,7 +88,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
public void checkSinglePartitionedSource2() {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
- env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+ env.setParallelism(DEFAULT_PARALLELISM);
DataSource<Tuple2<Long, String>> data =
env.readCsvFile("/some/path").types(Long.class, String.class);
@@ -121,7 +121,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
public void checkSinglePartitionedSource3() {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
- env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+ env.setParallelism(DEFAULT_PARALLELISM);
DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType);
@@ -153,7 +153,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
public void checkSinglePartitionedSource4() {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
- env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+ env.setParallelism(DEFAULT_PARALLELISM);
DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType);
@@ -185,7 +185,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
public void checkSinglePartitionedSource5() {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
- env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+ env.setParallelism(DEFAULT_PARALLELISM);
DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType);
@@ -217,7 +217,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
public void checkSinglePartitionedSource6() {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
- env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+ env.setParallelism(DEFAULT_PARALLELISM);
DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType);
@@ -249,7 +249,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
public void checkSinglePartitionedSource7() {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
- env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+ env.setParallelism(DEFAULT_PARALLELISM);
DataSource<Tuple2<Long, String>> data =
env.readCsvFile("/some/path").types(Long.class, String.class);
@@ -283,7 +283,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
public void checkSinglePartitionedGroupedSource1() {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
- env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+ env.setParallelism(DEFAULT_PARALLELISM);
DataSource<Tuple2<Long, String>> data =
env.readCsvFile("/some/path").types(Long.class, String.class);
@@ -317,7 +317,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
public void checkSinglePartitionedGroupedSource2() {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
- env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+ env.setParallelism(DEFAULT_PARALLELISM);
DataSource<Tuple2<Long, String>> data =
env.readCsvFile("/some/path").types(Long.class, String.class);
@@ -352,7 +352,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
public void checkSinglePartitionedGroupedSource3() {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
- env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+ env.setParallelism(DEFAULT_PARALLELISM);
DataSource<Tuple2<Long, String>> data =
env.readCsvFile("/some/path").types(Long.class, String.class);
@@ -386,7 +386,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
public void checkSinglePartitionedGroupedSource4() {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
- env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+ env.setParallelism(DEFAULT_PARALLELISM);
DataSource<Tuple2<Long, String>> data =
env.readCsvFile("/some/path").types(Long.class, String.class);
@@ -420,7 +420,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
public void checkSinglePartitionedGroupedSource5() {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
- env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+ env.setParallelism(DEFAULT_PARALLELISM);
DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType);
@@ -454,7 +454,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
public void checkSinglePartitionedGroupedSource6() {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
- env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+ env.setParallelism(DEFAULT_PARALLELISM);
DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType);
@@ -488,7 +488,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
public void checkSinglePartitionedGroupedSource7() {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
- env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+ env.setParallelism(DEFAULT_PARALLELISM);
DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType);
@@ -521,7 +521,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
public void checkSinglePartitionedGroupedSource8() {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
- env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+ env.setParallelism(DEFAULT_PARALLELISM);
DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType);
@@ -555,7 +555,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
public void checkSinglePartitionedOrderedSource1() {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
- env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+ env.setParallelism(DEFAULT_PARALLELISM);
DataSource<Tuple2<Long, String>> data =
env.readCsvFile("/some/path").types(Long.class, String.class);
@@ -589,7 +589,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
public void checkSinglePartitionedOrderedSource2() {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
- env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+ env.setParallelism(DEFAULT_PARALLELISM);
DataSource<Tuple2<Long, String>> data =
env.readCsvFile("/some/path").types(Long.class, String.class);
@@ -624,7 +624,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
public void checkSinglePartitionedOrderedSource3() {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
- env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+ env.setParallelism(DEFAULT_PARALLELISM);
DataSource<Tuple2<Long, String>> data =
env.readCsvFile("/some/path").types(Long.class, String.class);
@@ -658,7 +658,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
public void checkSinglePartitionedOrderedSource4() {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
- env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+ env.setParallelism(DEFAULT_PARALLELISM);
DataSource<Tuple2<Long, String>> data =
env.readCsvFile("/some/path").types(Long.class, String.class);
@@ -692,7 +692,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
public void checkSinglePartitionedOrderedSource5() {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
- env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+ env.setParallelism(DEFAULT_PARALLELISM);
DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType);
@@ -726,7 +726,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
public void checkSinglePartitionedOrderedSource6() {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
- env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+ env.setParallelism(DEFAULT_PARALLELISM);
DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType);
@@ -759,7 +759,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
public void checkSinglePartitionedOrderedSource7() {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
- env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+ env.setParallelism(DEFAULT_PARALLELISM);
DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType);
@@ -793,7 +793,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
public void checkCoPartitionedSources1() {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
- env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+ env.setParallelism(DEFAULT_PARALLELISM);
DataSource<Tuple2<Long, String>> data1 =
env.readCsvFile("/some/path").types(Long.class, String.class);
@@ -841,7 +841,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
public void checkCoPartitionedSources2() {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
- env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+ env.setParallelism(DEFAULT_PARALLELISM);
DataSource<Tuple2<Long, String>> data1 =
env.readCsvFile("/some/path").types(Long.class, String.class);
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java
index da44b59..fd451f7 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java
@@ -53,7 +53,7 @@ public class ReplicatingDataSourceTest extends CompilerTestBase {
public void checkJoinWithReplicatedSourceInput() {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
- env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+ env.setParallelism(DEFAULT_PARALLELISM);
ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));
@@ -89,7 +89,7 @@ public class ReplicatingDataSourceTest extends CompilerTestBase {
public void checkJoinWithReplicatedSourceInputBehindMap() {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
- env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+ env.setParallelism(DEFAULT_PARALLELISM);
ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));
@@ -126,7 +126,7 @@ public class ReplicatingDataSourceTest extends CompilerTestBase {
public void checkJoinWithReplicatedSourceInputBehindFilter() {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
- env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+ env.setParallelism(DEFAULT_PARALLELISM);
ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));
@@ -163,7 +163,7 @@ public class ReplicatingDataSourceTest extends CompilerTestBase {
public void checkJoinWithReplicatedSourceInputBehindFlatMap() {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
- env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+ env.setParallelism(DEFAULT_PARALLELISM);
ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));
@@ -200,7 +200,7 @@ public class ReplicatingDataSourceTest extends CompilerTestBase {
public void checkJoinWithReplicatedSourceInputBehindMapPartition() {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
- env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+ env.setParallelism(DEFAULT_PARALLELISM);
ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));
@@ -237,7 +237,7 @@ public class ReplicatingDataSourceTest extends CompilerTestBase {
public void checkJoinWithReplicatedSourceInputBehindMultiMaps() {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
- env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+ env.setParallelism(DEFAULT_PARALLELISM);
ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));
@@ -277,7 +277,7 @@ public class ReplicatingDataSourceTest extends CompilerTestBase {
public void checkCrossWithReplicatedSourceInput() {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
- env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+ env.setParallelism(DEFAULT_PARALLELISM);
ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));
@@ -313,7 +313,7 @@ public class ReplicatingDataSourceTest extends CompilerTestBase {
public void checkCrossWithReplicatedSourceInputBehindMap() {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
- env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+ env.setParallelism(DEFAULT_PARALLELISM);
ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));
@@ -345,13 +345,13 @@ public class ReplicatingDataSourceTest extends CompilerTestBase {
}
/**
- * Tests compiler fail for join program with replicated data source and changing DOP.
+ * Tests compiler fail for join program with replicated data source and changing parallelism.
*/
@Test(expected = CompilerException.class)
- public void checkJoinWithReplicatedSourceInputChangingDOP() {
+ public void checkJoinWithReplicatedSourceInputChangingparallelism() {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
- env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+ env.setParallelism(DEFAULT_PARALLELISM);
ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));
@@ -370,13 +370,13 @@ public class ReplicatingDataSourceTest extends CompilerTestBase {
}
/**
- * Tests compiler fail for join program with replicated data source behind map and changing DOP.
+ * Tests compiler fail for join program with replicated data source behind map and changing parallelism.
*/
@Test(expected = CompilerException.class)
- public void checkJoinWithReplicatedSourceInputBehindMapChangingDOP() {
+ public void checkJoinWithReplicatedSourceInputBehindMapChangingparallelism() {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
- env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+ env.setParallelism(DEFAULT_PARALLELISM);
ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));
@@ -402,7 +402,7 @@ public class ReplicatingDataSourceTest extends CompilerTestBase {
@Test(expected = CompilerException.class)
public void checkJoinWithReplicatedSourceInputBehindReduce() {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
- env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+ env.setParallelism(DEFAULT_PARALLELISM);
ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));
@@ -427,7 +427,7 @@ public class ReplicatingDataSourceTest extends CompilerTestBase {
@Test(expected = CompilerException.class)
public void checkJoinWithReplicatedSourceInputBehindRebalance() {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
- env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+ env.setParallelism(DEFAULT_PARALLELISM);
ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java
index d397ea2..f865a9f 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java
@@ -45,7 +45,7 @@ public class CustomPartitioningTest extends CompilerTestBase {
final int parallelism = 4;
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(parallelism);
+ env.setParallelism(parallelism);
DataSet<Tuple2<Integer, Integer>> data = env.fromElements(new Tuple2<Integer,Integer>(0, 0))
.rebalance();
@@ -88,7 +88,7 @@ public class CustomPartitioningTest extends CompilerTestBase {
final int parallelism = 4;
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(parallelism);
+ env.setParallelism(parallelism);
DataSet<Tuple2<Integer, Integer>> data = env.fromElements(new Tuple2<Integer,Integer>(0, 0))
.rebalance();
@@ -115,7 +115,7 @@ public class CustomPartitioningTest extends CompilerTestBase {
final int parallelism = 4;
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(parallelism);
+ env.setParallelism(parallelism);
DataSet<Pojo> data = env.fromElements(new Pojo())
.rebalance();
@@ -158,7 +158,7 @@ public class CustomPartitioningTest extends CompilerTestBase {
final int parallelism = 4;
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(parallelism);
+ env.setParallelism(parallelism);
DataSet<Pojo> data = env.fromElements(new Pojo())
.rebalance();
@@ -185,7 +185,7 @@ public class CustomPartitioningTest extends CompilerTestBase {
final int parallelism = 4;
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(parallelism);
+ env.setParallelism(parallelism);
DataSet<Pojo> data = env.fromElements(new Pojo())
.rebalance();
@@ -237,7 +237,7 @@ public class CustomPartitioningTest extends CompilerTestBase {
final int parallelism = 4;
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(parallelism);
+ env.setParallelism(parallelism);
DataSet<Pojo> data = env.fromElements(new Pojo())
.rebalance();
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DistinctAndGroupingOptimizerTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DistinctAndGroupingOptimizerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DistinctAndGroupingOptimizerTest.java
index de02836..2f9b32f 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DistinctAndGroupingOptimizerTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DistinctAndGroupingOptimizerTest.java
@@ -39,7 +39,7 @@ public class DistinctAndGroupingOptimizerTest extends CompilerTestBase {
public void testDistinctPreservesPartitioningOfDistinctFields() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(4);
+ env.setParallelism(4);
@SuppressWarnings("unchecked")
DataSet<Tuple2<Long, Long>> data = env.fromElements(new Tuple2<Long, Long>(0L, 0L), new Tuple2<Long, Long>(1L, 1L))
@@ -75,7 +75,7 @@ public class DistinctAndGroupingOptimizerTest extends CompilerTestBase {
public void testDistinctDestroysPartitioningOfNonDistinctFields() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(4);
+ env.setParallelism(4);
@SuppressWarnings("unchecked")
DataSet<Tuple2<Long, Long>> data = env.fromElements(new Tuple2<Long, Long>(0L, 0L), new Tuple2<Long, Long>(1L, 1L))
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
index a683968..c0e2fa7 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
@@ -44,7 +44,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
public void testAllGroupReduceNoCombiner() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(8);
+ env.setParallelism(8);
DataSet<Double> data = env.fromElements(0.2, 0.3, 0.4, 0.5).name("source");
@@ -59,7 +59,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
- // the all-reduce has no combiner, when the DOP of the input is one
+ // the all-reduce has no combiner, when the parallelism of the input is one
SourcePlanNode sourceNode = resolver.getNode("source");
SingleInputPlanNode reduceNode = resolver.getNode("reducer");
@@ -72,7 +72,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
// check that reduce has the right strategy
assertEquals(DriverStrategy.ALL_GROUP_REDUCE, reduceNode.getDriverStrategy());
- // check DOP
+ // check parallelism
assertEquals(1, sourceNode.getParallelism());
assertEquals(1, reduceNode.getParallelism());
assertEquals(1, sinkNode.getParallelism());
@@ -88,7 +88,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
public void testAllReduceWithCombiner() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(8);
+ env.setParallelism(8);
DataSet<Long> data = env.generateSequence(1, 8000000).name("source");
@@ -120,7 +120,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
assertEquals(DriverStrategy.ALL_GROUP_REDUCE, reduceNode.getDriverStrategy());
assertEquals(DriverStrategy.ALL_GROUP_REDUCE_COMBINE, combineNode.getDriverStrategy());
- // check DOP
+ // check parallelism
assertEquals(8, sourceNode.getParallelism());
assertEquals(8, combineNode.getParallelism());
assertEquals(1, reduceNode.getParallelism());
@@ -138,7 +138,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
public void testGroupedReduceWithFieldPositionKeyNonCombinable() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(8);
+ env.setParallelism(8);
DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
.name("source").setParallelism(6);
@@ -171,7 +171,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
assertEquals(new FieldList(1), reduceNode.getKeys(0));
assertEquals(new FieldList(1), reduceNode.getInput().getLocalStrategyKeys());
- // check DOP
+ // check parallelism
assertEquals(6, sourceNode.getParallelism());
assertEquals(8, reduceNode.getParallelism());
assertEquals(8, sinkNode.getParallelism());
@@ -187,7 +187,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
public void testGroupedReduceWithFieldPositionKeyCombinable() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(8);
+ env.setParallelism(8);
DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
.name("source").setParallelism(6);
@@ -228,7 +228,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
assertEquals(new FieldList(1), combineNode.getKeys(1));
assertEquals(new FieldList(1), reduceNode.getInput().getLocalStrategyKeys());
- // check DOP
+ // check parallelism
assertEquals(6, sourceNode.getParallelism());
assertEquals(6, combineNode.getParallelism());
assertEquals(8, reduceNode.getParallelism());
@@ -245,7 +245,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
public void testGroupedReduceWithSelectorFunctionKeyNoncombinable() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(8);
+ env.setParallelism(8);
DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
.name("source").setParallelism(6);
@@ -284,7 +284,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
assertEquals(new FieldList(0), reduceNode.getKeys(0));
assertEquals(new FieldList(0), reduceNode.getInput().getLocalStrategyKeys());
- // check DOP
+ // check parallelism
assertEquals(6, sourceNode.getParallelism());
assertEquals(6, keyExtractor.getParallelism());
@@ -303,7 +303,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
public void testGroupedReduceWithSelectorFunctionKeyCombinable() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(8);
+ env.setParallelism(8);
DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
.name("source").setParallelism(6);
@@ -350,7 +350,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
assertEquals(new FieldList(0), combineNode.getKeys(1));
assertEquals(new FieldList(0), reduceNode.getInput().getLocalStrategyKeys());
- // check DOP
+ // check parallelism
assertEquals(6, sourceNode.getParallelism());
assertEquals(6, keyExtractor.getParallelism());
assertEquals(6, combineNode.getParallelism());
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java
index 37a8e81..57d2d54 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java
@@ -45,7 +45,7 @@ public class IterationCompilerTest extends CompilerTestBase {
public void testIdentityIteration() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(43);
+ env.setParallelism(43);
IterativeDataSet<Long> iteration = env.generateSequence(-4, 1000).iterate(100);
iteration.closeWith(iteration).print();
@@ -65,7 +65,7 @@ public class IterationCompilerTest extends CompilerTestBase {
public void testEmptyWorksetIteration() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(43);
+ env.setParallelism(43);
DataSet<Tuple2<Long, Long>> input = env.generateSequence(1, 20)
.map(new MapFunction<Long, Tuple2<Long, Long>>() {
@@ -93,7 +93,7 @@ public class IterationCompilerTest extends CompilerTestBase {
public void testIterationWithUnionRoot() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(43);
+ env.setParallelism(43);
IterativeDataSet<Long> iteration = env.generateSequence(-4, 1000).iterate(100);
@@ -132,7 +132,7 @@ public class IterationCompilerTest extends CompilerTestBase {
public void testWorksetIterationWithUnionRoot() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(43);
+ env.setParallelism(43);
DataSet<Tuple2<Long, Long>> input = env.generateSequence(1, 20)
.map(new MapFunction<Long, Tuple2<Long, Long>>() {
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
index 0724a9f..e1b18f9 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
@@ -42,7 +42,7 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S
public void testAllReduceNoCombiner() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(8);
+ env.setParallelism(8);
DataSet<Double> data = env.fromElements(0.2, 0.3, 0.4, 0.5).name("source");
@@ -61,7 +61,7 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S
OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
- // the all-reduce has no combiner, when the DOP of the input is one
+ // the all-reduce has no combiner, when the parallelism of the input is one
SourcePlanNode sourceNode = resolver.getNode("source");
SingleInputPlanNode reduceNode = resolver.getNode("reducer");
@@ -71,7 +71,7 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S
assertEquals(sourceNode, reduceNode.getInput().getSource());
assertEquals(reduceNode, sinkNode.getInput().getSource());
- // check DOP
+ // check parallelism
assertEquals(1, sourceNode.getParallelism());
assertEquals(1, reduceNode.getParallelism());
assertEquals(1, sinkNode.getParallelism());
@@ -87,7 +87,7 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S
public void testAllReduceWithCombiner() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(8);
+ env.setParallelism(8);
DataSet<Long> data = env.generateSequence(1, 8000000).name("source");
@@ -121,7 +121,7 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S
assertEquals(DriverStrategy.ALL_REDUCE, reduceNode.getDriverStrategy());
assertEquals(DriverStrategy.ALL_REDUCE, combineNode.getDriverStrategy());
- // check DOP
+ // check parallelism
assertEquals(8, sourceNode.getParallelism());
assertEquals(8, combineNode.getParallelism());
assertEquals(1, reduceNode.getParallelism());
@@ -138,7 +138,7 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S
public void testGroupedReduceWithFieldPositionKey() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(8);
+ env.setParallelism(8);
DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
.name("source").setParallelism(6);
@@ -179,7 +179,7 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S
assertEquals(new FieldList(1), combineNode.getKeys(0));
assertEquals(new FieldList(1), reduceNode.getInput().getLocalStrategyKeys());
- // check DOP
+ // check parallelism
assertEquals(6, sourceNode.getParallelism());
assertEquals(6, combineNode.getParallelism());
assertEquals(8, reduceNode.getParallelism());
@@ -196,7 +196,7 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S
public void testGroupedReduceWithSelectorFunctionKey() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(8);
+ env.setParallelism(8);
DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
.name("source").setParallelism(6);
@@ -243,7 +243,7 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S
assertEquals(new FieldList(0), combineNode.getKeys(0));
assertEquals(new FieldList(0), reduceNode.getInput().getLocalStrategyKeys());
- // check DOP
+ // check parallelism
assertEquals(6, sourceNode.getParallelism());
assertEquals(6, keyExtractor.getParallelism());
assertEquals(6, combineNode.getParallelism());
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java
index 8720aa7..f1c2233 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java
@@ -196,7 +196,7 @@ public class WorksetIterationsJavaApiCompilerTest extends CompilerTestBase {
public void testRejectPlanIfSolutionSetKeysAndJoinKeysDontMatch() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+ env.setParallelism(DEFAULT_PARALLELISM);
@SuppressWarnings("unchecked")
DataSet<Tuple3<Long, Long, Long>> solutionSetInput = env.fromElements(new Tuple3<Long, Long, Long>(1L, 2L, 3L)).name("Solution Set");
@@ -245,7 +245,7 @@ public class WorksetIterationsJavaApiCompilerTest extends CompilerTestBase {
private Plan getJavaTestPlan(boolean joinPreservesSolutionSet, boolean mapBeforeSolutionDelta) {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+ env.setParallelism(DEFAULT_PARALLELISM);
@SuppressWarnings("unchecked")
DataSet<Tuple3<Long, Long, Long>> solutionSetInput = env.fromElements(new Tuple3<Long, Long, Long>(1L, 2L, 3L)).name("Solution Set");
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 91d01a2..d0615b3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -71,7 +71,7 @@ import static akka.dispatch.Futures.future;
* The ExecutionJobVertex is identified inside the graph by the {@link JobVertexID}, which it takes
* from the JobGraph's corresponding JobVertex.</li>
* <li>The {@link ExecutionVertex} represents one parallel subtask. For each ExecutionJobVertex, there are
- * as many ExecutionVertices as the degree of parallelism. The ExecutionVertex is identified by
+ * as many ExecutionVertices as the parallelism. The ExecutionVertex is identified by
* the ExecutionJobVertex and the number of the parallel subtask</li>
* <li>The {@link Execution} is one attempt to execute a ExecutionVertex. There may be multiple Executions
* for the ExecutionVertex, in case of a failure, or in the case where some data needs to be recomputed
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java
index 8816a69..c948155 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java
@@ -194,22 +194,22 @@ public class AbstractJobVertex implements java.io.Serializable {
}
/**
- * Gets the degree of parallelism of the task.
+ * Gets the parallelism of the task.
*
- * @return The degree of parallelism of the task.
+ * @return The parallelism of the task.
*/
public int getParallelism() {
return parallelism;
}
/**
- * Sets the degree of parallelism for the task.
+ * Sets the parallelism for the task.
*
- * @param parallelism The degree of parallelism for the task.
+ * @param parallelism The parallelism for the task.
*/
public void setParallelism(int parallelism) {
if (parallelism < 1) {
- throw new IllegalArgumentException("The degree of parallelism must be at least one.");
+ throw new IllegalArgumentException("The parallelism must be at least one.");
}
this.parallelism = parallelism;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/ShipStrategyType.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/ShipStrategyType.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/ShipStrategyType.java
index fb32a6e..47b1b96 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/ShipStrategyType.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/ShipStrategyType.java
@@ -36,7 +36,7 @@ public enum ShipStrategyType {
FORWARD(false, false),
/**
- * Repartitioning the data randomly, typically when the degree of parallelism between two nodes changes.
+ * Repartitioning the data randomly, typically when the parallelism between two nodes changes.
*/
PARTITION_RANDOM(true, false),
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
index dfe6b50..f0001a9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
@@ -86,7 +86,7 @@ public class LocalInputSplitsTest {
new TestLocatableInputSplit(3, "host3")
};
- // This should fail with an exception, since the DOP of 2 does not
+ // This should fail with an exception, since the parallelism of 2 does not
// support strictly local assignment onto 3 hosts
try {
runTests(numHosts, slotsPerHost, parallelism, splits);