You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/07/22 12:41:17 UTC
[77/92] [abbrv] git commit: [FLINK-1018] Add tests to verify correct
placement of pipeline breakers with broadcast variables
[FLINK-1018] Add tests to verify correct placement of pipeline breakers with broadcast variables
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/ec0b00d6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/ec0b00d6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/ec0b00d6
Branch: refs/heads/travis_test
Commit: ec0b00d613a4400baf53f5de2361a2271a26ae63
Parents: a822486
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Jul 11 18:02:52 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sat Jul 12 19:31:26 2014 +0200
----------------------------------------------------------------------
.../compiler/BranchingPlansCompilerTest.java | 10 +-
.../flink/compiler/PipelineBreakerTest.java | 137 +++++++++++++++++++
.../testfunctions/SelectOneReducer.java | 28 ++++
3 files changed, 170 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ec0b00d6/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
index 31dadae..571f4e4 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
@@ -359,6 +359,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
}
}
+ @SuppressWarnings({ "unchecked", "deprecation" })
@Test
public void testBranchEachContractType() {
try {
@@ -374,7 +375,6 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
.name("Reduce 1")
.build();
- @SuppressWarnings("unchecked")
JoinOperator match1 = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0)
.input1(sourceB, sourceB, sourceC)
.input2(sourceC)
@@ -434,10 +434,10 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
.build();
FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, cogroup7);
- // sink.addInput(sourceA);
- // sink.addInput(co3);
- // sink.addInput(co4);
- // sink.addInput(co1);
+ sink.addInput(sourceA);
+ sink.addInput(cogroup3);
+ sink.addInput(cogroup4);
+ sink.addInput(cogroup1);
// return the PACT plan
Plan plan = new Plan(sink, "Branching of each contract type");
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ec0b00d6/flink-compiler/src/test/java/org/apache/flink/compiler/PipelineBreakerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/PipelineBreakerTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/PipelineBreakerTest.java
new file mode 100644
index 0000000..4e43a74
--- /dev/null
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/PipelineBreakerTest.java
@@ -0,0 +1,137 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package org.apache.flink.compiler;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.IterativeDataSet;
+import org.apache.flink.compiler.plan.BulkIterationPlanNode;
+import org.apache.flink.compiler.plan.OptimizedPlan;
+import org.apache.flink.compiler.plan.SingleInputPlanNode;
+import org.apache.flink.compiler.plan.SinkPlanNode;
+import org.apache.flink.compiler.plandump.PlanJSONDumpGenerator;
+import org.apache.flink.compiler.testfunctions.IdentityMapper;
+import org.apache.flink.compiler.testfunctions.SelectOneReducer;
+
+@SuppressWarnings("serial")
+public class PipelineBreakerTest extends CompilerTestBase {
+
+ @Test
+ public void testPipelineBreakerWithBroadcastVariable() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(64);
+
+ DataSet<Long> source = env.generateSequence(1, 10).map(new IdentityMapper<Long>());
+
+ DataSet<Long> result = source.map(new IdentityMapper<Long>())
+ .map(new IdentityMapper<Long>())
+ .withBroadcastSet(source, "bc");
+
+ result.print();
+
+ Plan p = env.createProgramPlan();
+ OptimizedPlan op = compileNoStats(p);
+
+ SinkPlanNode sink = op.getDataSinks().iterator().next();
+ SingleInputPlanNode mapper = (SingleInputPlanNode) sink.getInput().getSource();
+
+ assertTrue(mapper.getInput().getTempMode().breaksPipeline());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testPipelineBreakerBroadcastedAllReduce() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(64);
+
+ DataSet<Long> sourceWithMapper = env.generateSequence(1, 10).map(new IdentityMapper<Long>());
+
+ DataSet<Long> bcInput1 = sourceWithMapper
+ .map(new IdentityMapper<Long>())
+ .reduce(new SelectOneReducer<Long>());
+ DataSet<Long> bcInput2 = env.generateSequence(1, 10);
+
+ DataSet<Long> result = sourceWithMapper
+ .map(new IdentityMapper<Long>())
+ .withBroadcastSet(bcInput1, "bc1")
+ .withBroadcastSet(bcInput2, "bc2");
+
+ result.print();
+
+ Plan p = env.createProgramPlan();
+ OptimizedPlan op = compileNoStats(p);
+
+ SinkPlanNode sink = op.getDataSinks().iterator().next();
+ SingleInputPlanNode mapper = (SingleInputPlanNode) sink.getInput().getSource();
+
+ assertTrue(mapper.getInput().getTempMode().breaksPipeline());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testPipelineBreakerBroadcastedPartialSolution() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(64);
+
+
+ DataSet<Long> initialSource = env.generateSequence(1, 10);
+ IterativeDataSet<Long> iteration = initialSource.iterate(100);
+
+
+ DataSet<Long> sourceWithMapper = env.generateSequence(1, 10).map(new IdentityMapper<Long>());
+
+ DataSet<Long> bcInput1 = sourceWithMapper
+ .map(new IdentityMapper<Long>())
+ .reduce(new SelectOneReducer<Long>());
+
+ DataSet<Long> result = sourceWithMapper
+ .map(new IdentityMapper<Long>())
+ .withBroadcastSet(iteration, "bc2")
+ .withBroadcastSet(bcInput1, "bc1");
+
+
+ iteration.closeWith(result).print();
+
+ Plan p = env.createProgramPlan();
+ OptimizedPlan op = compileNoStats(p);
+
+ SinkPlanNode sink = op.getDataSinks().iterator().next();
+ BulkIterationPlanNode iterationPlanNode = (BulkIterationPlanNode) sink.getInput().getSource();
+ SingleInputPlanNode mapper = (SingleInputPlanNode) iterationPlanNode.getRootOfStepFunction();
+
+ assertTrue(mapper.getInput().getTempMode().breaksPipeline());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ec0b00d6/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/SelectOneReducer.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/SelectOneReducer.java b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/SelectOneReducer.java
new file mode 100644
index 0000000..492b9f8
--- /dev/null
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/SelectOneReducer.java
@@ -0,0 +1,28 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package org.apache.flink.compiler.testfunctions;
+
+import org.apache.flink.api.java.functions.ReduceFunction;
+
+public class SelectOneReducer<T> extends ReduceFunction<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public T reduce(T value1, T value2) throws Exception {
+ return value1;
+ }
+}