You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/20 11:07:21 UTC

[42/53] [abbrv] flink git commit: [optimizer] Migrate first set of tests (branching plans) to new API

[optimizer] Migrate first set of tests (branching plans) to new API


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c2db1812
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c2db1812
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c2db1812

Branch: refs/heads/master
Commit: c2db18120c53dc8712b08369f7ea5b93ace98c6b
Parents: a9150b3
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Mar 17 16:14:50 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Mar 20 10:21:14 2015 +0100

----------------------------------------------------------------------
 .../optimizer/BranchingPlansCompilerTest.java   | 414 ++++++++-----------
 1 file changed, 180 insertions(+), 234 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c2db1812/flink-compiler/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
index ff0e004..916aa27 100644
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.optimizer;
 
 import static org.junit.Assert.*;
@@ -26,6 +25,12 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.optimizer.testfunctions.DummyCoGroupFunction;
+import org.apache.flink.optimizer.testfunctions.SelectOneReducer;
+import org.apache.flink.util.Collector;
 import org.junit.Assert;
 import org.junit.Test;
 import org.apache.flink.api.common.Plan;
@@ -73,39 +78,27 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
 		final int SINKS = 5;
 	
 		try {
-			List<FileDataSink> sinks = new ArrayList<FileDataSink>();
-	
-			// construct the plan
-			final String out1Path = "file:///test/1";
-			final String out2Path = "file:///test/2";
-	
-			FileDataSource sourceA = new FileDataSource(DummyInputFormat.class, IN_FILE);
-	
-			MapOperator mapA = MapOperator.builder(IdentityMap.class).input(sourceA).name("Map A").build();
-			MapOperator mapC = MapOperator.builder(IdentityMap.class).input(mapA).name("Map C").build();
-	
-			FileDataSink[] sinkA = new FileDataSink[SINKS];
-			FileDataSink[] sinkB = new FileDataSink[SINKS];
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+			DataSet<Long> source = env.generateSequence(1, 10000);
+
+			DataSet<Long> mappedA = source.map(new IdentityMapper<Long>());
+			DataSet<Long> mappedC = source.map(new IdentityMapper<Long>());
+
 			for (int sink = 0; sink < SINKS; sink++) {
-				sinkA[sink] = new FileDataSink(DummyOutputFormat.class, out1Path, mapA, "Sink A:" + sink);
-				sinks.add(sinkA[sink]);
-	
-				sinkB[sink] = new FileDataSink(DummyOutputFormat.class, out2Path, mapC, "Sink B:" + sink);
-				sinks.add(sinkB[sink]);
+				mappedA.output(new DiscardingOutputFormat<Long>());
+				mappedC.output(new DiscardingOutputFormat<Long>());
 			}
-	
-			// return the PACT plan
-			Plan plan = new Plan(sinks, "Plans With Multiple Data Sinks");
-	
+
+			Plan plan = env.createProgramPlan("Plans With Multiple Data Sinks");
 			OptimizedPlan oPlan = compileNoStats(plan);
-	
-			// ---------- compile plan to nephele job graph to verify that no error is thrown ----------
-	
-			JobGraphGenerator jobGen = new JobGraphGenerator();
-			jobGen.compileJobGraph(oPlan);
-		} catch (Exception e) {
+
+			new JobGraphGenerator().compileJobGraph(oPlan);
+		}
+		catch (Exception e) {
 			e.printStackTrace();
-			Assert.fail(e.getMessage());
+			fail(e.getMessage());
 		}
 	}
 
@@ -122,57 +115,44 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
 	 *        (SINK A)    (SINK B)  (SINK C)
 	 * </pre>
 	 */
+	@SuppressWarnings("unchecked")
 	@Test
 	public void testBranchingWithMultipleDataSinks2() {
 		try {
-			// construct the plan
-			final String out1Path = "file:///test/1";
-			final String out2Path = "file:///test/2";
-			final String out3Path = "file:///test/3";
-	
-			FileDataSource sourceA = new FileDataSource(DummyInputFormat.class, IN_FILE);
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+			DataSet<Long> source = env.generateSequence(1, 10000);
+
+			DataSet<Long> mappedA = source.map(new IdentityMapper<Long>());
+			DataSet<Long> mappedB = mappedA.map(new IdentityMapper<Long>());
+			DataSet<Long> mappedC = mappedA.map(new IdentityMapper<Long>());
+
+			mappedB.output(new DiscardingOutputFormat<Long>());
+			mappedC.output(new DiscardingOutputFormat<Long>());
+			mappedC.output(new DiscardingOutputFormat<Long>());
+
+			Plan plan = env.createProgramPlan();
+			Set<Operator<?>> sinks = new HashSet<Operator<?>>(plan.getDataSinks());
 
-			MapOperator mapA = MapOperator.builder(IdentityMap.class).input(sourceA).name("Map A").build();
-			MapOperator mapB = MapOperator.builder(IdentityMap.class).input(mapA).name("Map B").build();
-			MapOperator mapC = MapOperator.builder(IdentityMap.class).input(mapA).name("Map C").build();
-			
-			FileDataSink sinkA = new FileDataSink(DummyOutputFormat.class, out1Path, mapB, "Sink A");
-			FileDataSink sinkB = new FileDataSink(DummyOutputFormat.class, out2Path, mapC, "Sink B");
-			FileDataSink sinkC = new FileDataSink(DummyOutputFormat.class, out3Path, mapC, "Sink C");
-			
-			List<FileDataSink> sinks = new ArrayList<FileDataSink>();
-			sinks.add(sinkA);
-			sinks.add(sinkB);
-			sinks.add(sinkC);
-			
-			// return the PACT plan
-			Plan plan = new Plan(sinks, "Plans With Multiple Data Sinks");
-			
 			OptimizedPlan oPlan = compileNoStats(plan);
-			
+
 			// ---------- check the optimizer plan ----------
-			
+
 			// number of sinks
-			Assert.assertEquals("Wrong number of data sinks.", 3, oPlan.getDataSinks().size());
-			
-			// sinks contain all sink paths
-			Set<String> allSinks = new HashSet<String>();
-			allSinks.add(out1Path);
-			allSinks.add(out2Path);
-			allSinks.add(out3Path);
-			
-			for (SinkPlanNode n : oPlan.getDataSinks()) {
-				String path = ((FileDataSink) n.getSinkNode().getOperator()).getFilePath();
-				Assert.assertTrue("Invalid data sink.", allSinks.remove(path));
+			assertEquals("Wrong number of data sinks.", 3, oPlan.getDataSinks().size());
+
+			// remove matching sinks to check relation
+			for (SinkPlanNode sink : oPlan.getDataSinks()) {
+				assertTrue(sinks.remove(sink.getProgramOperator()));
 			}
-			
-			// ---------- compile plan to nephele job graph to verify that no error is thrown ----------
-			
-			JobGraphGenerator jobGen = new JobGraphGenerator();
-			jobGen.compileJobGraph(oPlan);
-		} catch (Exception e) {
+			assertTrue(sinks.isEmpty());
+
+			new JobGraphGenerator().compileJobGraph(oPlan);
+		}
+		catch (Exception e) {
 			e.printStackTrace();
-			Assert.fail(e.getMessage());
+			fail(e.getMessage());
 		}
 	}
 	
@@ -203,72 +183,64 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
 	@Test
 	public void testBranchingSourceMultipleTimes() {
 		try {
-			// construct the plan
-			FileDataSource sourceA = new FileDataSource(new DummyInputFormat(), IN_FILE);
-			
-			JoinOperator mat1 = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0)
-				.input1(sourceA)
-				.input2(sourceA)
-				.build();
-			JoinOperator mat2 = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0)
-				.input1(sourceA)
-				.input2(mat1)
-				.build();
-			JoinOperator mat3 = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0)
-				.input1(sourceA)
-				.input2(mat2)
-				.build();
-			JoinOperator mat4 = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0)
-				.input1(sourceA)
-				.input2(mat3)
-				.build();
-			JoinOperator mat5 = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0)
-				.input1(sourceA)
-				.input2(mat4)
-				.build();
-			
-			MapOperator ma = MapOperator.builder(new IdentityMap()).input(sourceA).build();
-			
-			JoinOperator mat6 = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0)
-				.input1(ma)
-				.input2(ma)
-				.build();
-			JoinOperator mat7 = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0)
-				.input1(ma)
-				.input2(mat6)
-				.build();
-			JoinOperator mat8 = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0)
-				.input1(ma)
-				.input2(mat7)
-				.build();
-			JoinOperator mat9 = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0)
-				.input1(ma)
-				.input2(mat8)
-				.build();
-			JoinOperator mat10 = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0)
-				.input1(ma)
-				.input2(mat9)
-				.build();
-			
-			CoGroupOperator co = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 0, 0)
-				.input1(mat5)
-				.input2(mat10)
-				.build();
-	
-			FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, co);
-			
-			// return the PACT plan
-			Plan plan = new Plan(sink, "Branching Source Multiple Times");
-			
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+			DataSet<Tuple2<Long, Long>> source = env.generateSequence(1, 10000000)
+				.map(new Duplicator<Long>());
+
+			DataSet<Tuple2<Long, Long>> joined1 = source.join(source).where(0).equalTo(0)
+														.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
+
+			DataSet<Tuple2<Long, Long>> joined2 = source.join(joined1).where(0).equalTo(0)
+					.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
+
+			DataSet<Tuple2<Long, Long>> joined3 = source.join(joined2).where(0).equalTo(0)
+					.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
+
+			DataSet<Tuple2<Long, Long>> joined4 = source.join(joined3).where(0).equalTo(0)
+					.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
+
+			DataSet<Tuple2<Long, Long>> joined5 = source.join(joined4).where(0).equalTo(0)
+					.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
+
+			DataSet<Tuple2<Long, Long>> mapped = source.map(
+					new MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() {
+						@Override
+						public Tuple2<Long, Long> map(Tuple2<Long, Long> value) {
+							return null;
+						}
+			});
+
+			DataSet<Tuple2<Long, Long>> joined6 = mapped.join(mapped).where(0).equalTo(0)
+					.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
+
+			DataSet<Tuple2<Long, Long>> joined7 = mapped.join(joined6).where(0).equalTo(0)
+					.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
+
+			DataSet<Tuple2<Long, Long>> joined8 = mapped.join(joined7).where(0).equalTo(0)
+					.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
+
+			DataSet<Tuple2<Long, Long>> joined9 = mapped.join(joined8).where(0).equalTo(0)
+					.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
+
+			DataSet<Tuple2<Long, Long>> joined10 = mapped.join(joined9).where(0).equalTo(0)
+					.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
+
+
+			joined5.coGroup(joined10)
+					.where(1).equalTo(1)
+					.with(new DummyCoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>())
+
+				.output(new DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>>());
+
+			Plan plan = env.createProgramPlan();
 			OptimizedPlan oPlan = compileNoStats(plan);
-			
-			JobGraphGenerator jobGen = new JobGraphGenerator();
-			
-			//Compile plan to verify that no error is thrown
-			jobGen.compileJobGraph(oPlan);
-		} catch (Exception e) {
+			new JobGraphGenerator().compileJobGraph(oPlan);
+		}
+		catch (Exception e) {
 			e.printStackTrace();
-			Assert.fail(e.getMessage());
+			fail(e.getMessage());
 		}
 	}
 	
@@ -294,73 +266,54 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
 	@Test
 	public void testBranchingWithMultipleDataSinks() {
 		try {
-			// construct the plan
-			final String out1Path = "file:///test/1";
-			final String out2Path = "file:///test/2";
-			final String out3Path = "file:///test/3";
-	
-			FileDataSource sourceA = new FileDataSource(new DummyInputFormat(), IN_FILE);
-			FileDataSource sourceB = new FileDataSource(new DummyInputFormat(), IN_FILE);
-			FileDataSource sourceC = new FileDataSource(new DummyInputFormat(), IN_FILE);
-			
-			CoGroupOperator co = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 0,0)
-				.input1(sourceA)
-				.input2(sourceB)
-				.build();
-			MapOperator ma = MapOperator.builder(new IdentityMap()).input(co).build();
-			JoinOperator mat1 = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0)
-				.input1(sourceB)
-				.input2(sourceC)
-				.build();
-			JoinOperator mat2 = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0)
-				.input1(ma)
-				.input2(mat1)
-				.build();
-			ReduceOperator r = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0)
-				.input(ma)
-				.build();
-			CrossOperator c = CrossOperator.builder(new DummyCrossStub())
-				.input1(r)
-				.input2(mat2)
-				.build();
-			
-			FileDataSink sinkA = new FileDataSink(new DummyOutputFormat(), out1Path, c);
-			FileDataSink sinkB = new FileDataSink(new DummyOutputFormat(), out2Path, mat2);
-			FileDataSink sinkC = new FileDataSink(new DummyOutputFormat(), out3Path, mat2);
-			
-			List<FileDataSink> sinks = new ArrayList<FileDataSink>();
-			sinks.add(sinkA);
-			sinks.add(sinkB);
-			sinks.add(sinkC);
-			
-			// return the PACT plan
-			Plan plan = new Plan(sinks, "Branching Plans With Multiple Data Sinks");
-			
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+			DataSet<Tuple2<Long, Long>> sourceA = env.generateSequence(1, 10000000)
+					.map(new Duplicator<Long>());
+
+			DataSet<Tuple2<Long, Long>> sourceB = env.generateSequence(1, 10000000)
+					.map(new Duplicator<Long>());
+
+			DataSet<Tuple2<Long, Long>> sourceC = env.generateSequence(1, 10000000)
+					.map(new Duplicator<Long>());
+
+			DataSet<Tuple2<Long, Long>> mapped = sourceA.coGroup(sourceB)
+					.where(0).equalTo(1)
+					.with(new CoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>() {
+							@Override
+							public void coGroup(Iterable<Tuple2<Long, Long>> first,
+													Iterable<Tuple2<Long, Long>> second,
+													Collector<Tuple2<Long, Long>> out) {
+							  }
+					})
+					.map(new IdentityMapper<Tuple2<Long, Long>>());
+
+			DataSet<Tuple2<Long, Long>> joined = sourceB.join(sourceC)
+					.where(0).equalTo(1)
+					.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
+
+			DataSet<Tuple2<Long, Long>> joined2 = mapped.join(joined)
+					.where(1).equalTo(1)
+					.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
+
+			DataSet<Tuple2<Long, Long>> reduced = mapped
+					.groupBy(1)
+					.reduceGroup(new Top1GroupReducer<Tuple2<Long, Long>>());
+
+			reduced.cross(joined2)
+					.output(new DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>>());
+
+			joined2.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
+			joined2.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
+
+			Plan plan = env.createProgramPlan();
 			OptimizedPlan oPlan = compileNoStats(plan);
-			
-			// ---------- check the optimizer plan ----------
-			
-			// number of sinks
-			Assert.assertEquals("Wrong number of data sinks.", 3, oPlan.getDataSinks().size());
-			
-			// sinks contain all sink paths
-			Set<String> allSinks = new HashSet<String>();
-			allSinks.add(out1Path);
-			allSinks.add(out2Path);
-			allSinks.add(out3Path);
-			
-			for (SinkPlanNode n : oPlan.getDataSinks()) {
-				String path = ((FileDataSink) n.getSinkNode().getOperator()).getFilePath();
-				Assert.assertTrue("Invalid data sink.", allSinks.remove(path));
-			}
-			
-			// ---------- compile plan to nephele job graph to verify that no error is thrown ----------
-			
-			JobGraphGenerator jobGen = new JobGraphGenerator();
-			jobGen.compileJobGraph(oPlan);
-		} catch (Exception e) {
+			new JobGraphGenerator().compileJobGraph(oPlan);
+		}
+		catch (Exception e) {
 			e.printStackTrace();
-			Assert.fail(e.getMessage());
+			fail(e.getMessage());
 		}
 	}
 	
@@ -860,48 +813,37 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
 	 */
 	@Test
 	public void testIterationWithStaticInput() {
-		FileDataSource source = new FileDataSource(DummyInputFormat.class, IN_FILE, "source");
-
-		MapOperator mappedSource = MapOperator.builder(IdentityMap.class).
-				input(source).
-				name("Identity mapped source").
-				build();
-
-		ReduceOperator reducedSource = ReduceOperator.builder(IdentityReduce.class).
-				input(source).
-				name("Identity reduce source").
-				build();
-
-		BulkIteration iteration = new BulkIteration("Loop");
-		iteration.setInput(mappedSource);
-		iteration.setMaximumNumberOfIterations(10);
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setDegreeOfParallelism(100);
 
-		JoinOperator nextPartialSolution = JoinOperator.builder(DummyMatchStub.class, IntValue.class, 0,0).
-				input1(iteration.getPartialSolution()).
-				input2(reducedSource).
-				name("Next partial solution").
-				build();
+			DataSet<Long> source = env.generateSequence(1, 1000000);
 
-		iteration.setNextPartialSolution(nextPartialSolution);
+			DataSet<Long> mapped = source.map(new IdentityMapper<Long>());
 
-		FileDataSink sink = new FileDataSink(DummyOutputFormat.class, OUT_FILE, iteration, "Iteration sink");
-		List<FileDataSink> sinks = new ArrayList<FileDataSink>();
-		sinks.add(sink);
+			DataSet<Long> reduced = source.groupBy(new IdentityKeyExtractor<Long>()).reduce(new SelectOneReducer<Long>());
 
-		Plan plan = new Plan(sinks);
+			IterativeDataSet<Long> iteration = mapped.iterate(10);
+			iteration.closeWith(
+					iteration.join(reduced)
+							.where(new IdentityKeyExtractor<Long>())
+							.equalTo(new IdentityKeyExtractor<Long>())
+							.with(new DummyFlatJoinFunction<Long>()))
+					.output(new DiscardingOutputFormat<Long>());
 
-		try{
-			compileNoStats(plan);
-		}catch(Exception e){
+			compileNoStats(env.createProgramPlan());
+		}
+		catch(Exception e){
 			e.printStackTrace();
-			Assert.fail(e.getMessage());
+			fail(e.getMessage());
 		}
 	}
 	
 	@Test
 	public void testBranchingBroadcastVariable() {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
+		env.setDegreeOfParallelism(100);
+
 		DataSet<String> input1 = env.readTextFile(IN_FILE).name("source1");
 		DataSet<String> input2 = env.readTextFile(IN_FILE).name("source2");
 		DataSet<String> input3 = env.readTextFile(IN_FILE).name("source3");
@@ -972,6 +914,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
 	@Test
 	public void testMultipleIterations() {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setDegreeOfParallelism(100);
 		
 		DataSet<String> input = env.readTextFile(IN_FILE).name("source1");
 		
@@ -1000,7 +943,8 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
 	@Test
 	public void testMultipleIterationsWithClosueBCVars() {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
+		env.setDegreeOfParallelism(100);
+
 		DataSet<String> input = env.readTextFile(IN_FILE).name("source1");
 			
 		IterativeDataSet<String> iteration1 = input.iterate(100);
@@ -1026,7 +970,8 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
 	public void testBranchesOnlyInBCVariables1() {
 		try{
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			
+			env.setDegreeOfParallelism(100);
+
 			DataSet<Long> input = env.generateSequence(1, 10);
 			DataSet<Long> bc_input = env.generateSequence(1, 10);
 			
@@ -1048,7 +993,8 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
 	public void testBranchesOnlyInBCVariables2() {
 		try{
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			
+			env.setDegreeOfParallelism(100);
+
 			DataSet<Tuple2<Long, Long>> input = env.generateSequence(1, 10).map(new Duplicator<Long>()).name("proper input");
 			
 			DataSet<Long> bc_input1 = env.generateSequence(1, 10).name("BC input 1");