You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/20 11:07:21 UTC
[42/53] [abbrv] flink git commit: [optimizer] Migrate first set of
tests (branching plans) to new API
[optimizer] Migrate first set of tests (branching plans) to new API
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c2db1812
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c2db1812
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c2db1812
Branch: refs/heads/master
Commit: c2db18120c53dc8712b08369f7ea5b93ace98c6b
Parents: a9150b3
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Mar 17 16:14:50 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Mar 20 10:21:14 2015 +0100
----------------------------------------------------------------------
.../optimizer/BranchingPlansCompilerTest.java | 414 ++++++++-----------
1 file changed, 180 insertions(+), 234 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c2db1812/flink-compiler/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
index ff0e004..916aa27 100644
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.optimizer;
import static org.junit.Assert.*;
@@ -26,6 +25,12 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.optimizer.testfunctions.DummyCoGroupFunction;
+import org.apache.flink.optimizer.testfunctions.SelectOneReducer;
+import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;
import org.apache.flink.api.common.Plan;
@@ -73,39 +78,27 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
final int SINKS = 5;
try {
- List<FileDataSink> sinks = new ArrayList<FileDataSink>();
-
- // construct the plan
- final String out1Path = "file:///test/1";
- final String out2Path = "file:///test/2";
-
- FileDataSource sourceA = new FileDataSource(DummyInputFormat.class, IN_FILE);
-
- MapOperator mapA = MapOperator.builder(IdentityMap.class).input(sourceA).name("Map A").build();
- MapOperator mapC = MapOperator.builder(IdentityMap.class).input(mapA).name("Map C").build();
-
- FileDataSink[] sinkA = new FileDataSink[SINKS];
- FileDataSink[] sinkB = new FileDataSink[SINKS];
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+ DataSet<Long> source = env.generateSequence(1, 10000);
+
+ DataSet<Long> mappedA = source.map(new IdentityMapper<Long>());
+ DataSet<Long> mappedC = source.map(new IdentityMapper<Long>());
+
for (int sink = 0; sink < SINKS; sink++) {
- sinkA[sink] = new FileDataSink(DummyOutputFormat.class, out1Path, mapA, "Sink A:" + sink);
- sinks.add(sinkA[sink]);
-
- sinkB[sink] = new FileDataSink(DummyOutputFormat.class, out2Path, mapC, "Sink B:" + sink);
- sinks.add(sinkB[sink]);
+ mappedA.output(new DiscardingOutputFormat<Long>());
+ mappedC.output(new DiscardingOutputFormat<Long>());
}
-
- // return the PACT plan
- Plan plan = new Plan(sinks, "Plans With Multiple Data Sinks");
-
+
+ Plan plan = env.createProgramPlan("Plans With Multiple Data Sinks");
OptimizedPlan oPlan = compileNoStats(plan);
-
- // ---------- compile plan to nephele job graph to verify that no error is thrown ----------
-
- JobGraphGenerator jobGen = new JobGraphGenerator();
- jobGen.compileJobGraph(oPlan);
- } catch (Exception e) {
+
+ new JobGraphGenerator().compileJobGraph(oPlan);
+ }
+ catch (Exception e) {
e.printStackTrace();
- Assert.fail(e.getMessage());
+ fail(e.getMessage());
}
}
@@ -122,57 +115,44 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
* (SINK A) (SINK B) (SINK C)
* </pre>
*/
+ @SuppressWarnings("unchecked")
@Test
public void testBranchingWithMultipleDataSinks2() {
try {
- // construct the plan
- final String out1Path = "file:///test/1";
- final String out2Path = "file:///test/2";
- final String out3Path = "file:///test/3";
-
- FileDataSource sourceA = new FileDataSource(DummyInputFormat.class, IN_FILE);
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+ DataSet<Long> source = env.generateSequence(1, 10000);
+
+ DataSet<Long> mappedA = source.map(new IdentityMapper<Long>());
+ DataSet<Long> mappedB = mappedA.map(new IdentityMapper<Long>());
+ DataSet<Long> mappedC = mappedA.map(new IdentityMapper<Long>());
+
+ mappedB.output(new DiscardingOutputFormat<Long>());
+ mappedC.output(new DiscardingOutputFormat<Long>());
+ mappedC.output(new DiscardingOutputFormat<Long>());
+
+ Plan plan = env.createProgramPlan();
+ Set<Operator<?>> sinks = new HashSet<Operator<?>>(plan.getDataSinks());
- MapOperator mapA = MapOperator.builder(IdentityMap.class).input(sourceA).name("Map A").build();
- MapOperator mapB = MapOperator.builder(IdentityMap.class).input(mapA).name("Map B").build();
- MapOperator mapC = MapOperator.builder(IdentityMap.class).input(mapA).name("Map C").build();
-
- FileDataSink sinkA = new FileDataSink(DummyOutputFormat.class, out1Path, mapB, "Sink A");
- FileDataSink sinkB = new FileDataSink(DummyOutputFormat.class, out2Path, mapC, "Sink B");
- FileDataSink sinkC = new FileDataSink(DummyOutputFormat.class, out3Path, mapC, "Sink C");
-
- List<FileDataSink> sinks = new ArrayList<FileDataSink>();
- sinks.add(sinkA);
- sinks.add(sinkB);
- sinks.add(sinkC);
-
- // return the PACT plan
- Plan plan = new Plan(sinks, "Plans With Multiple Data Sinks");
-
OptimizedPlan oPlan = compileNoStats(plan);
-
+
// ---------- check the optimizer plan ----------
-
+
// number of sinks
- Assert.assertEquals("Wrong number of data sinks.", 3, oPlan.getDataSinks().size());
-
- // sinks contain all sink paths
- Set<String> allSinks = new HashSet<String>();
- allSinks.add(out1Path);
- allSinks.add(out2Path);
- allSinks.add(out3Path);
-
- for (SinkPlanNode n : oPlan.getDataSinks()) {
- String path = ((FileDataSink) n.getSinkNode().getOperator()).getFilePath();
- Assert.assertTrue("Invalid data sink.", allSinks.remove(path));
+ assertEquals("Wrong number of data sinks.", 3, oPlan.getDataSinks().size());
+
+ // remove matching sinks to check relation
+ for (SinkPlanNode sink : oPlan.getDataSinks()) {
+ assertTrue(sinks.remove(sink.getProgramOperator()));
}
-
- // ---------- compile plan to nephele job graph to verify that no error is thrown ----------
-
- JobGraphGenerator jobGen = new JobGraphGenerator();
- jobGen.compileJobGraph(oPlan);
- } catch (Exception e) {
+ assertTrue(sinks.isEmpty());
+
+ new JobGraphGenerator().compileJobGraph(oPlan);
+ }
+ catch (Exception e) {
e.printStackTrace();
- Assert.fail(e.getMessage());
+ fail(e.getMessage());
}
}
@@ -203,72 +183,64 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
@Test
public void testBranchingSourceMultipleTimes() {
try {
- // construct the plan
- FileDataSource sourceA = new FileDataSource(new DummyInputFormat(), IN_FILE);
-
- JoinOperator mat1 = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0)
- .input1(sourceA)
- .input2(sourceA)
- .build();
- JoinOperator mat2 = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0)
- .input1(sourceA)
- .input2(mat1)
- .build();
- JoinOperator mat3 = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0)
- .input1(sourceA)
- .input2(mat2)
- .build();
- JoinOperator mat4 = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0)
- .input1(sourceA)
- .input2(mat3)
- .build();
- JoinOperator mat5 = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0)
- .input1(sourceA)
- .input2(mat4)
- .build();
-
- MapOperator ma = MapOperator.builder(new IdentityMap()).input(sourceA).build();
-
- JoinOperator mat6 = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0)
- .input1(ma)
- .input2(ma)
- .build();
- JoinOperator mat7 = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0)
- .input1(ma)
- .input2(mat6)
- .build();
- JoinOperator mat8 = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0)
- .input1(ma)
- .input2(mat7)
- .build();
- JoinOperator mat9 = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0)
- .input1(ma)
- .input2(mat8)
- .build();
- JoinOperator mat10 = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0)
- .input1(ma)
- .input2(mat9)
- .build();
-
- CoGroupOperator co = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 0, 0)
- .input1(mat5)
- .input2(mat10)
- .build();
-
- FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, co);
-
- // return the PACT plan
- Plan plan = new Plan(sink, "Branching Source Multiple Times");
-
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+ DataSet<Tuple2<Long, Long>> source = env.generateSequence(1, 10000000)
+ .map(new Duplicator<Long>());
+
+ DataSet<Tuple2<Long, Long>> joined1 = source.join(source).where(0).equalTo(0)
+ .with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
+
+ DataSet<Tuple2<Long, Long>> joined2 = source.join(joined1).where(0).equalTo(0)
+ .with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
+
+ DataSet<Tuple2<Long, Long>> joined3 = source.join(joined2).where(0).equalTo(0)
+ .with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
+
+ DataSet<Tuple2<Long, Long>> joined4 = source.join(joined3).where(0).equalTo(0)
+ .with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
+
+ DataSet<Tuple2<Long, Long>> joined5 = source.join(joined4).where(0).equalTo(0)
+ .with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
+
+ DataSet<Tuple2<Long, Long>> mapped = source.map(
+ new MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() {
+ @Override
+ public Tuple2<Long, Long> map(Tuple2<Long, Long> value) {
+ return null;
+ }
+ });
+
+ DataSet<Tuple2<Long, Long>> joined6 = mapped.join(mapped).where(0).equalTo(0)
+ .with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
+
+ DataSet<Tuple2<Long, Long>> joined7 = mapped.join(joined6).where(0).equalTo(0)
+ .with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
+
+ DataSet<Tuple2<Long, Long>> joined8 = mapped.join(joined7).where(0).equalTo(0)
+ .with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
+
+ DataSet<Tuple2<Long, Long>> joined9 = mapped.join(joined8).where(0).equalTo(0)
+ .with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
+
+ DataSet<Tuple2<Long, Long>> joined10 = mapped.join(joined9).where(0).equalTo(0)
+ .with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
+
+
+ joined5.coGroup(joined10)
+ .where(1).equalTo(1)
+ .with(new DummyCoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>())
+
+ .output(new DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>>());
+
+ Plan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileNoStats(plan);
-
- JobGraphGenerator jobGen = new JobGraphGenerator();
-
- //Compile plan to verify that no error is thrown
- jobGen.compileJobGraph(oPlan);
- } catch (Exception e) {
+ new JobGraphGenerator().compileJobGraph(oPlan);
+ }
+ catch (Exception e) {
e.printStackTrace();
- Assert.fail(e.getMessage());
+ fail(e.getMessage());
}
}
@@ -294,73 +266,54 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
@Test
public void testBranchingWithMultipleDataSinks() {
try {
- // construct the plan
- final String out1Path = "file:///test/1";
- final String out2Path = "file:///test/2";
- final String out3Path = "file:///test/3";
-
- FileDataSource sourceA = new FileDataSource(new DummyInputFormat(), IN_FILE);
- FileDataSource sourceB = new FileDataSource(new DummyInputFormat(), IN_FILE);
- FileDataSource sourceC = new FileDataSource(new DummyInputFormat(), IN_FILE);
-
- CoGroupOperator co = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 0,0)
- .input1(sourceA)
- .input2(sourceB)
- .build();
- MapOperator ma = MapOperator.builder(new IdentityMap()).input(co).build();
- JoinOperator mat1 = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0)
- .input1(sourceB)
- .input2(sourceC)
- .build();
- JoinOperator mat2 = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0)
- .input1(ma)
- .input2(mat1)
- .build();
- ReduceOperator r = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0)
- .input(ma)
- .build();
- CrossOperator c = CrossOperator.builder(new DummyCrossStub())
- .input1(r)
- .input2(mat2)
- .build();
-
- FileDataSink sinkA = new FileDataSink(new DummyOutputFormat(), out1Path, c);
- FileDataSink sinkB = new FileDataSink(new DummyOutputFormat(), out2Path, mat2);
- FileDataSink sinkC = new FileDataSink(new DummyOutputFormat(), out3Path, mat2);
-
- List<FileDataSink> sinks = new ArrayList<FileDataSink>();
- sinks.add(sinkA);
- sinks.add(sinkB);
- sinks.add(sinkC);
-
- // return the PACT plan
- Plan plan = new Plan(sinks, "Branching Plans With Multiple Data Sinks");
-
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+ DataSet<Tuple2<Long, Long>> sourceA = env.generateSequence(1, 10000000)
+ .map(new Duplicator<Long>());
+
+ DataSet<Tuple2<Long, Long>> sourceB = env.generateSequence(1, 10000000)
+ .map(new Duplicator<Long>());
+
+ DataSet<Tuple2<Long, Long>> sourceC = env.generateSequence(1, 10000000)
+ .map(new Duplicator<Long>());
+
+ DataSet<Tuple2<Long, Long>> mapped = sourceA.coGroup(sourceB)
+ .where(0).equalTo(1)
+ .with(new CoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>() {
+ @Override
+ public void coGroup(Iterable<Tuple2<Long, Long>> first,
+ Iterable<Tuple2<Long, Long>> second,
+ Collector<Tuple2<Long, Long>> out) {
+ }
+ })
+ .map(new IdentityMapper<Tuple2<Long, Long>>());
+
+ DataSet<Tuple2<Long, Long>> joined = sourceB.join(sourceC)
+ .where(0).equalTo(1)
+ .with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
+
+ DataSet<Tuple2<Long, Long>> joined2 = mapped.join(joined)
+ .where(1).equalTo(1)
+ .with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
+
+ DataSet<Tuple2<Long, Long>> reduced = mapped
+ .groupBy(1)
+ .reduceGroup(new Top1GroupReducer<Tuple2<Long, Long>>());
+
+ reduced.cross(joined2)
+ .output(new DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>>());
+
+ joined2.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
+ joined2.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
+
+ Plan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileNoStats(plan);
-
- // ---------- check the optimizer plan ----------
-
- // number of sinks
- Assert.assertEquals("Wrong number of data sinks.", 3, oPlan.getDataSinks().size());
-
- // sinks contain all sink paths
- Set<String> allSinks = new HashSet<String>();
- allSinks.add(out1Path);
- allSinks.add(out2Path);
- allSinks.add(out3Path);
-
- for (SinkPlanNode n : oPlan.getDataSinks()) {
- String path = ((FileDataSink) n.getSinkNode().getOperator()).getFilePath();
- Assert.assertTrue("Invalid data sink.", allSinks.remove(path));
- }
-
- // ---------- compile plan to nephele job graph to verify that no error is thrown ----------
-
- JobGraphGenerator jobGen = new JobGraphGenerator();
- jobGen.compileJobGraph(oPlan);
- } catch (Exception e) {
+ new JobGraphGenerator().compileJobGraph(oPlan);
+ }
+ catch (Exception e) {
e.printStackTrace();
- Assert.fail(e.getMessage());
+ fail(e.getMessage());
}
}
@@ -860,48 +813,37 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
*/
@Test
public void testIterationWithStaticInput() {
- FileDataSource source = new FileDataSource(DummyInputFormat.class, IN_FILE, "source");
-
- MapOperator mappedSource = MapOperator.builder(IdentityMap.class).
- input(source).
- name("Identity mapped source").
- build();
-
- ReduceOperator reducedSource = ReduceOperator.builder(IdentityReduce.class).
- input(source).
- name("Identity reduce source").
- build();
-
- BulkIteration iteration = new BulkIteration("Loop");
- iteration.setInput(mappedSource);
- iteration.setMaximumNumberOfIterations(10);
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(100);
- JoinOperator nextPartialSolution = JoinOperator.builder(DummyMatchStub.class, IntValue.class, 0,0).
- input1(iteration.getPartialSolution()).
- input2(reducedSource).
- name("Next partial solution").
- build();
+ DataSet<Long> source = env.generateSequence(1, 1000000);
- iteration.setNextPartialSolution(nextPartialSolution);
+ DataSet<Long> mapped = source.map(new IdentityMapper<Long>());
- FileDataSink sink = new FileDataSink(DummyOutputFormat.class, OUT_FILE, iteration, "Iteration sink");
- List<FileDataSink> sinks = new ArrayList<FileDataSink>();
- sinks.add(sink);
+ DataSet<Long> reduced = source.groupBy(new IdentityKeyExtractor<Long>()).reduce(new SelectOneReducer<Long>());
- Plan plan = new Plan(sinks);
+ IterativeDataSet<Long> iteration = mapped.iterate(10);
+ iteration.closeWith(
+ iteration.join(reduced)
+ .where(new IdentityKeyExtractor<Long>())
+ .equalTo(new IdentityKeyExtractor<Long>())
+ .with(new DummyFlatJoinFunction<Long>()))
+ .output(new DiscardingOutputFormat<Long>());
- try{
- compileNoStats(plan);
- }catch(Exception e){
+ compileNoStats(env.createProgramPlan());
+ }
+ catch(Exception e){
e.printStackTrace();
- Assert.fail(e.getMessage());
+ fail(e.getMessage());
}
}
@Test
public void testBranchingBroadcastVariable() {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
+ env.setDegreeOfParallelism(100);
+
DataSet<String> input1 = env.readTextFile(IN_FILE).name("source1");
DataSet<String> input2 = env.readTextFile(IN_FILE).name("source2");
DataSet<String> input3 = env.readTextFile(IN_FILE).name("source3");
@@ -972,6 +914,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
@Test
public void testMultipleIterations() {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(100);
DataSet<String> input = env.readTextFile(IN_FILE).name("source1");
@@ -1000,7 +943,8 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
@Test
public void testMultipleIterationsWithClosueBCVars() {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
+ env.setDegreeOfParallelism(100);
+
DataSet<String> input = env.readTextFile(IN_FILE).name("source1");
IterativeDataSet<String> iteration1 = input.iterate(100);
@@ -1026,7 +970,8 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
public void testBranchesOnlyInBCVariables1() {
try{
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
+ env.setDegreeOfParallelism(100);
+
DataSet<Long> input = env.generateSequence(1, 10);
DataSet<Long> bc_input = env.generateSequence(1, 10);
@@ -1048,7 +993,8 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
public void testBranchesOnlyInBCVariables2() {
try{
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
+ env.setDegreeOfParallelism(100);
+
DataSet<Tuple2<Long, Long>> input = env.generateSequence(1, 10).map(new Duplicator<Long>()).name("proper input");
DataSet<Long> bc_input1 = env.generateSequence(1, 10).name("BC input 1");