You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/20 11:06:47 UTC
[08/53] [abbrv] flink git commit: [optimizer] Rename optimizer
project to "flink-optimizer" (previously flink-compiler)
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java
new file mode 100644
index 0000000..565d992
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics;
+import org.apache.flink.api.common.operators.GenericDataSourceBase;
+import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.common.operators.base.BulkIterationBase;
+import org.apache.flink.optimizer.costs.DefaultCostEstimator;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.PlanNode;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.Visitor;
+import org.junit.Before;
+
+/**
+ * Base class for Optimizer tests. Offers utility methods to trigger optimization
+ * of a program and to fetch the nodes in an optimizer plan that correspond
+ * the the node in the program plan.
+ */
+public abstract class CompilerTestBase implements java.io.Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ protected static final String IN_FILE = OperatingSystem.isWindows() ? "file:/c:/" : "file:///dev/random";
+
+ protected static final String OUT_FILE = OperatingSystem.isWindows() ? "file:/c:/" : "file:///dev/null";
+
+ protected static final int DEFAULT_PARALLELISM = 8;
+
+ private static final String CACHE_KEY = "cachekey";
+
+ // ------------------------------------------------------------------------
+
+ protected transient DataStatistics dataStats;
+
+ protected transient Optimizer withStatsCompiler;
+
+ protected transient Optimizer noStatsCompiler;
+
+ private transient int statCounter;
+
+ // ------------------------------------------------------------------------
+
+ @Before
+ public void setup() {
+ this.dataStats = new DataStatistics();
+ this.withStatsCompiler = new Optimizer(this.dataStats, new DefaultCostEstimator());
+ this.withStatsCompiler.setDefaultDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+ this.noStatsCompiler = new Optimizer(null, new DefaultCostEstimator());
+ this.noStatsCompiler.setDefaultDegreeOfParallelism(DEFAULT_PARALLELISM);
+ }
+
+ // ------------------------------------------------------------------------
+
+ public OptimizedPlan compileWithStats(Plan p) {
+ return this.withStatsCompiler.compile(p);
+ }
+
+ public OptimizedPlan compileNoStats(Plan p) {
+ return this.noStatsCompiler.compile(p);
+ }
+
+ public void setSourceStatistics(GenericDataSourceBase<?, ?> source, long size, float recordWidth) {
+ setSourceStatistics(source, new FileBaseStatistics(Long.MAX_VALUE, size, recordWidth));
+ }
+
+ public void setSourceStatistics(GenericDataSourceBase<?, ?> source, FileBaseStatistics stats) {
+ final String key = CACHE_KEY + this.statCounter++;
+ this.dataStats.cacheBaseStatistics(stats, key);
+ source.setStatisticsKey(key);
+ }
+
+ // ------------------------------------------------------------------------
+
+ public static OptimizerPlanNodeResolver getOptimizerPlanNodeResolver(OptimizedPlan plan) {
+ return new OptimizerPlanNodeResolver(plan);
+ }
+
+ public static final class OptimizerPlanNodeResolver {
+
+ private final Map<String, ArrayList<PlanNode>> map;
+
+ OptimizerPlanNodeResolver(OptimizedPlan p) {
+ HashMap<String, ArrayList<PlanNode>> map = new HashMap<String, ArrayList<PlanNode>>();
+
+ for (PlanNode n : p.getAllNodes()) {
+ Operator<?> c = n.getOriginalOptimizerNode().getOperator();
+ String name = c.getName();
+
+ ArrayList<PlanNode> list = map.get(name);
+ if (list == null) {
+ list = new ArrayList<PlanNode>(2);
+ map.put(name, list);
+ }
+
+ // check whether this node is a child of a node with the same contract (aka combiner)
+ boolean shouldAdd = true;
+ for (Iterator<PlanNode> iter = list.iterator(); iter.hasNext();) {
+ PlanNode in = iter.next();
+ if (in.getOriginalOptimizerNode().getOperator() == c) {
+ // is this the child or is our node the child
+ if (in instanceof SingleInputPlanNode && n instanceof SingleInputPlanNode) {
+ SingleInputPlanNode thisNode = (SingleInputPlanNode) n;
+ SingleInputPlanNode otherNode = (SingleInputPlanNode) in;
+
+ if (thisNode.getPredecessor() == otherNode) {
+ // other node is child, remove it
+ iter.remove();
+ } else if (otherNode.getPredecessor() == thisNode) {
+ shouldAdd = false;
+ }
+ } else {
+ throw new RuntimeException("Unrecodnized case in test.");
+ }
+ }
+ }
+
+ if (shouldAdd) {
+ list.add(n);
+ }
+ }
+
+ this.map = map;
+ }
+
+
+ @SuppressWarnings("unchecked")
+ public <T extends PlanNode> T getNode(String name) {
+ List<PlanNode> nodes = this.map.get(name);
+ if (nodes == null || nodes.isEmpty()) {
+ throw new RuntimeException("No node found with the given name.");
+ } else if (nodes.size() != 1) {
+ throw new RuntimeException("Multiple nodes found with the given name.");
+ } else {
+ return (T) nodes.get(0);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T extends PlanNode> T getNode(String name, Class<? extends Function> stubClass) {
+ List<PlanNode> nodes = this.map.get(name);
+ if (nodes == null || nodes.isEmpty()) {
+ throw new RuntimeException("No node found with the given name and stub class.");
+ } else {
+ PlanNode found = null;
+ for (PlanNode node : nodes) {
+ if (node.getClass() == stubClass) {
+ if (found == null) {
+ found = node;
+ } else {
+ throw new RuntimeException("Multiple nodes found with the given name and stub class.");
+ }
+ }
+ }
+ if (found == null) {
+ throw new RuntimeException("No node found with the given name and stub class.");
+ } else {
+ return (T) found;
+ }
+ }
+ }
+
+ public List<PlanNode> getNodes(String name) {
+ List<PlanNode> nodes = this.map.get(name);
+ if (nodes == null || nodes.isEmpty()) {
+ throw new RuntimeException("No node found with the given name.");
+ } else {
+ return new ArrayList<PlanNode>(nodes);
+ }
+ }
+ }
+
+ /**
+ * Collects all DataSources of a plan to add statistics
+ *
+ */
+ public static class SourceCollectorVisitor implements Visitor<Operator<?>> {
+
+ protected final List<GenericDataSourceBase<?, ?>> sources = new ArrayList<GenericDataSourceBase<?, ?>>(4);
+
+ @Override
+ public boolean preVisit(Operator<?> visitable) {
+
+ if(visitable instanceof GenericDataSourceBase) {
+ sources.add((GenericDataSourceBase<?, ?>) visitable);
+ }
+ else if(visitable instanceof BulkIterationBase) {
+ ((BulkIterationBase<?>) visitable).getNextPartialSolution().accept(this);
+ }
+
+ return true;
+ }
+
+ @Override
+ public void postVisit(Operator<?> visitable) {}
+
+ public List<GenericDataSourceBase<?, ?>> getSources() {
+ return this.sources;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DOPChangeTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DOPChangeTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DOPChangeTest.java
new file mode 100644
index 0000000..b17e777
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DOPChangeTest.java
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.optimizer;
+
+import org.junit.Assert;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.record.operators.FileDataSink;
+import org.apache.flink.api.java.record.operators.FileDataSource;
+import org.apache.flink.api.java.record.operators.JoinOperator;
+import org.apache.flink.api.java.record.operators.MapOperator;
+import org.apache.flink.api.java.record.operators.ReduceOperator;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.DualInputPlanNode;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.PlanNode;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.optimizer.util.DummyInputFormat;
+import org.apache.flink.optimizer.util.DummyMatchStub;
+import org.apache.flink.optimizer.util.DummyOutputFormat;
+import org.apache.flink.optimizer.util.IdentityMap;
+import org.apache.flink.optimizer.util.IdentityReduce;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.runtime.operators.util.LocalStrategy;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.util.Visitor;
+import org.junit.Test;
+
+/**
+ * Tests in this class:
+ * <ul>
+ * <li>Tests that check the correct handling of the properties and strategies in the case where the degree of
+ * parallelism between tasks is increased or decreased.
+ * </ul>
+ */
+@SuppressWarnings({"serial", "deprecation"})
+public class DOPChangeTest extends CompilerTestBase {
+
+ /**
+ * Simple Job: Map -> Reduce -> Map -> Reduce. All functions preserve all fields (hence all properties).
+ *
+ * Increases DOP between 1st reduce and 2nd map, so the hash partitioning from 1st reduce is not reusable.
+ * Expected to re-establish partitioning between reduce and map, via hash, because random is a full network
+ * transit as well.
+ */
+ @Test
+ public void checkPropertyHandlingWithIncreasingGlobalParallelism1() {
+ final int degOfPar = DEFAULT_PARALLELISM;
+
+ // construct the plan
+ FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source");
+ source.setDegreeOfParallelism(degOfPar);
+
+ MapOperator map1 = MapOperator.builder(new IdentityMap()).name("Map1").build();
+ map1.setDegreeOfParallelism(degOfPar);
+ map1.setInput(source);
+
+ ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 1").build();
+ reduce1.setDegreeOfParallelism(degOfPar);
+ reduce1.setInput(map1);
+
+ MapOperator map2 = MapOperator.builder(new IdentityMap()).name("Map2").build();
+ map2.setDegreeOfParallelism(degOfPar * 2);
+ map2.setInput(reduce1);
+
+ ReduceOperator reduce2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 2").build();
+ reduce2.setDegreeOfParallelism(degOfPar * 2);
+ reduce2.setInput(map2);
+
+ FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink");
+ sink.setDegreeOfParallelism(degOfPar * 2);
+ sink.setInput(reduce2);
+
+ Plan plan = new Plan(sink, "Test Increasing Degree Of Parallelism");
+
+ // submit the plan to the compiler
+ OptimizedPlan oPlan = compileNoStats(plan);
+
+ // check the optimized Plan
+ // when reducer 1 distributes its data across the instances of map2, it needs to employ a local hash method,
+ // because map2 has twice as many instances and key/value pairs with the same key need to be processed by the same
+ // mapper respectively reducer
+ SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+ SingleInputPlanNode red2Node = (SingleInputPlanNode) sinkNode.getPredecessor();
+ SingleInputPlanNode map2Node = (SingleInputPlanNode) red2Node.getPredecessor();
+
+ ShipStrategyType mapIn = map2Node.getInput().getShipStrategy();
+ ShipStrategyType redIn = red2Node.getInput().getShipStrategy();
+
+ Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.PARTITION_HASH, mapIn);
+ Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, redIn);
+ }
+
+ /**
+ * Simple Job: Map -> Reduce -> Map -> Reduce. All functions preserve all fields (hence all properties).
+ *
+ * Increases DOP between 2nd map and 2nd reduce, so the hash partitioning from 1st reduce is not reusable.
+ * Expected to re-establish partitioning between map and reduce (hash).
+ */
+ @Test
+ public void checkPropertyHandlingWithIncreasingGlobalParallelism2() {
+ final int degOfPar = DEFAULT_PARALLELISM;
+
+ // construct the plan
+ FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source");
+ source.setDegreeOfParallelism(degOfPar);
+
+ MapOperator map1 = MapOperator.builder(new IdentityMap()).name("Map1").build();
+ map1.setDegreeOfParallelism(degOfPar);
+ map1.setInput(source);
+
+ ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 1").build();
+ reduce1.setDegreeOfParallelism(degOfPar);
+ reduce1.setInput(map1);
+
+ MapOperator map2 = MapOperator.builder(new IdentityMap()).name("Map2").build();
+ map2.setDegreeOfParallelism(degOfPar);
+ map2.setInput(reduce1);
+
+ ReduceOperator reduce2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 2").build();
+ reduce2.setDegreeOfParallelism(degOfPar * 2);
+ reduce2.setInput(map2);
+
+ FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink");
+ sink.setDegreeOfParallelism(degOfPar * 2);
+ sink.setInput(reduce2);
+
+ Plan plan = new Plan(sink, "Test Increasing Degree Of Parallelism");
+
+ // submit the plan to the compiler
+ OptimizedPlan oPlan = compileNoStats(plan);
+
+ // check the optimized Plan
+ // when reducer 1 distributes its data across the instances of map2, it needs to employ a local hash method,
+ // because map2 has twice as many instances and key/value pairs with the same key need to be processed by the same
+ // mapper respectively reducer
+ SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+ SingleInputPlanNode red2Node = (SingleInputPlanNode) sinkNode.getPredecessor();
+ SingleInputPlanNode map2Node = (SingleInputPlanNode) red2Node.getPredecessor();
+
+ ShipStrategyType mapIn = map2Node.getInput().getShipStrategy();
+ ShipStrategyType reduceIn = red2Node.getInput().getShipStrategy();
+
+ Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, mapIn);
+ Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.PARTITION_HASH, reduceIn);
+ }
+
+ /**
+ * Simple Job: Map -> Reduce -> Map -> Reduce. All functions preserve all fields (hence all properties).
+ *
+ * Increases DOP between 1st reduce and 2nd map, such that more tasks are on one instance.
+ * Expected to re-establish partitioning between map and reduce via a local hash.
+ */
+ @Test
+ public void checkPropertyHandlingWithIncreasingLocalParallelism() {
+ final int degOfPar = 2 * DEFAULT_PARALLELISM;
+
+ // construct the plan
+ FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source");
+ source.setDegreeOfParallelism(degOfPar);
+
+ MapOperator map1 = MapOperator.builder(new IdentityMap()).name("Map1").build();
+ map1.setDegreeOfParallelism(degOfPar);
+ map1.setInput(source);
+
+ ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 1").build();
+ reduce1.setDegreeOfParallelism(degOfPar);
+ reduce1.setInput(map1);
+
+ MapOperator map2 = MapOperator.builder(new IdentityMap()).name("Map2").build();
+ map2.setDegreeOfParallelism(degOfPar * 2);
+ map2.setInput(reduce1);
+
+ ReduceOperator reduce2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 2").build();
+ reduce2.setDegreeOfParallelism(degOfPar * 2);
+ reduce2.setInput(map2);
+
+ FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink");
+ sink.setDegreeOfParallelism(degOfPar * 2);
+ sink.setInput(reduce2);
+
+ Plan plan = new Plan(sink, "Test Increasing Degree Of Parallelism");
+
+ // submit the plan to the compiler
+ OptimizedPlan oPlan = compileNoStats(plan);
+
+ // check the optimized Plan
+ // when reducer 1 distributes its data across the instances of map2, it needs to employ a local hash method,
+ // because map2 has twice as many instances and key/value pairs with the same key need to be processed by the same
+ // mapper respectively reducer
+ SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+ SingleInputPlanNode red2Node = (SingleInputPlanNode) sinkNode.getPredecessor();
+ SingleInputPlanNode map2Node = (SingleInputPlanNode) red2Node.getPredecessor();
+
+ ShipStrategyType mapIn = map2Node.getInput().getShipStrategy();
+ ShipStrategyType reduceIn = red2Node.getInput().getShipStrategy();
+
+ Assert.assertTrue("Invalid ship strategy for an operator.",
+ (ShipStrategyType.PARTITION_RANDOM == mapIn && ShipStrategyType.PARTITION_HASH == reduceIn) ||
+ (ShipStrategyType.PARTITION_HASH == mapIn && ShipStrategyType.FORWARD == reduceIn));
+ }
+
+
+
+ @Test
+ public void checkPropertyHandlingWithDecreasingDegreeOfParallelism() {
+ final int degOfPar = DEFAULT_PARALLELISM;
+
+ // construct the plan
+ FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source");
+ source.setDegreeOfParallelism(degOfPar * 2);
+
+ MapOperator map1 = MapOperator.builder(new IdentityMap()).name("Map1").build();
+ map1.setDegreeOfParallelism(degOfPar * 2);
+ map1.setInput(source);
+
+ ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 1").build();
+ reduce1.setDegreeOfParallelism(degOfPar * 2);
+ reduce1.setInput(map1);
+
+ MapOperator map2 = MapOperator.builder(new IdentityMap()).name("Map2").build();
+ map2.setDegreeOfParallelism(degOfPar);
+ map2.setInput(reduce1);
+
+ ReduceOperator reduce2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 2").build();
+ reduce2.setDegreeOfParallelism(degOfPar);
+ reduce2.setInput(map2);
+
+ FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink");
+ sink.setDegreeOfParallelism(degOfPar);
+ sink.setInput(reduce2);
+
+ Plan plan = new Plan(sink, "Test Increasing Degree Of Parallelism");
+
+ // submit the plan to the compiler
+ OptimizedPlan oPlan = compileNoStats(plan);
+
+ // check the optimized Plan
+ // when reducer 1 distributes its data across the instances of map2, it needs to employ a local hash method,
+ // because map2 has twice as many instances and key/value pairs with the same key need to be processed by the same
+ // mapper respectively reducer
+ SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+ SingleInputPlanNode red2Node = (SingleInputPlanNode) sinkNode.getPredecessor();
+ SingleInputPlanNode map2Node = (SingleInputPlanNode) red2Node.getPredecessor();
+
+ Assert.assertTrue("The no sorting local strategy.",
+ LocalStrategy.SORT == red2Node.getInput().getLocalStrategy() ||
+ LocalStrategy.SORT == map2Node.getInput().getLocalStrategy());
+
+ Assert.assertTrue("The no partitioning ship strategy.",
+ ShipStrategyType.PARTITION_HASH == red2Node.getInput().getShipStrategy() ||
+ ShipStrategyType.PARTITION_HASH == map2Node.getInput().getShipStrategy());
+ }
+
+ /**
+ * Checks that re-partitioning happens when the inputs of a two-input contract have different DOPs.
+ *
+ * Test Plan:
+ * <pre>
+ *
+ * (source) -> reduce -\
+ * Match -> (sink)
+ * (source) -> reduce -/
+ *
+ * </pre>
+ *
+ */
+ @Test
+ public void checkPropertyHandlingWithTwoInputs() {
+ // construct the plan
+
+ FileDataSource sourceA = new FileDataSource(new DummyInputFormat(), IN_FILE);
+ FileDataSource sourceB = new FileDataSource(new DummyInputFormat(), IN_FILE);
+
+ ReduceOperator redA = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0)
+ .input(sourceA)
+ .build();
+ ReduceOperator redB = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0)
+ .input(sourceB)
+ .build();
+
+ JoinOperator mat = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0)
+ .input1(redA)
+ .input2(redB)
+ .build();
+
+ FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, mat);
+
+ sourceA.setDegreeOfParallelism(5);
+ sourceB.setDegreeOfParallelism(7);
+ redA.setDegreeOfParallelism(5);
+ redB.setDegreeOfParallelism(7);
+
+ mat.setDegreeOfParallelism(5);
+
+ sink.setDegreeOfParallelism(5);
+
+
+ // return the PACT plan
+ Plan plan = new Plan(sink, "Partition on DoP Change");
+
+ OptimizedPlan oPlan = compileNoStats(plan);
+
+ JobGraphGenerator jobGen = new JobGraphGenerator();
+
+ //Compile plan to verify that no error is thrown
+ jobGen.compileJobGraph(oPlan);
+
+ oPlan.accept(new Visitor<PlanNode>() {
+
+ @Override
+ public boolean preVisit(PlanNode visitable) {
+ if (visitable instanceof DualInputPlanNode) {
+ DualInputPlanNode node = (DualInputPlanNode) visitable;
+ Channel c1 = node.getInput1();
+ Channel c2 = node.getInput2();
+
+ Assert.assertEquals("Incompatible shipping strategy chosen for match", ShipStrategyType.FORWARD, c1.getShipStrategy());
+ Assert.assertEquals("Incompatible shipping strategy chosen for match", ShipStrategyType.PARTITION_HASH, c2.getShipStrategy());
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public void postVisit(PlanNode visitable) {
+ // DO NOTHING
+ }
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java
new file mode 100644
index 0000000..aaee975
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class DisjointDataFlowsTest extends CompilerTestBase {
+
+ @Test
+ public void testDisjointFlows() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ // generate two different flows
+ env.generateSequence(1, 10).print();
+ env.generateSequence(1, 10).print();
+
+ Plan p = env.createProgramPlan();
+ OptimizedPlan op = compileNoStats(p);
+
+ new JobGraphGenerator().compileJobGraph(op);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
new file mode 100644
index 0000000..34aa9f8
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.optimizer;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.operators.DistinctOperator;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.optimizer.plan.SourcePlanNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+
+@SuppressWarnings("serial")
+public class DistinctCompilationTest extends CompilerTestBase implements java.io.Serializable {
+
+ @Test
+ public void testDistinctPlain() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(8);
+
+ DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
+ .name("source").setParallelism(6);
+
+ data
+ .distinct().name("reducer")
+ .print().name("sink");
+
+ Plan p = env.createProgramPlan();
+ OptimizedPlan op = compileNoStats(p);
+
+ OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
+
+ // get the original nodes
+ SourcePlanNode sourceNode = resolver.getNode("source");
+ SingleInputPlanNode reduceNode = resolver.getNode("reducer");
+ SinkPlanNode sinkNode = resolver.getNode("sink");
+
+ // get the combiner
+ SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getInput().getSource();
+
+ // check wiring
+ assertEquals(sourceNode, combineNode.getInput().getSource());
+ assertEquals(reduceNode, sinkNode.getInput().getSource());
+
+ // check that both reduce and combiner have the same strategy
+ assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reduceNode.getDriverStrategy());
+ assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combineNode.getDriverStrategy());
+
+ // check the keys
+ assertEquals(new FieldList(0, 1), reduceNode.getKeys(0));
+ assertEquals(new FieldList(0, 1), combineNode.getKeys(0));
+ assertEquals(new FieldList(0, 1), reduceNode.getInput().getLocalStrategyKeys());
+
+ // check DOP
+ assertEquals(6, sourceNode.getParallelism());
+ assertEquals(6, combineNode.getParallelism());
+ assertEquals(8, reduceNode.getParallelism());
+ assertEquals(8, sinkNode.getParallelism());
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testDistinctWithSelectorFunctionKey() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(8);
+
+ DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
+ .name("source").setParallelism(6);
+
+ data
+ .distinct(new KeySelector<Tuple2<String,Double>, String>() {
+ public String getKey(Tuple2<String, Double> value) { return value.f0; }
+ }).name("reducer")
+ .print().name("sink");
+
+ Plan p = env.createProgramPlan();
+ OptimizedPlan op = compileNoStats(p);
+
+ OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
+
+ // get the original nodes
+ SourcePlanNode sourceNode = resolver.getNode("source");
+ SingleInputPlanNode reduceNode = resolver.getNode("reducer");
+ SinkPlanNode sinkNode = resolver.getNode("sink");
+
+ // get the combiner
+ SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getInput().getSource();
+
+ // get the key extractors and projectors
+ SingleInputPlanNode keyExtractor = (SingleInputPlanNode) combineNode.getInput().getSource();
+ SingleInputPlanNode keyProjector = (SingleInputPlanNode) sinkNode.getInput().getSource();
+
+ // check wiring
+ assertEquals(sourceNode, keyExtractor.getInput().getSource());
+ assertEquals(keyProjector, sinkNode.getInput().getSource());
+
+ // check that both reduce and combiner have the same strategy
+ assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reduceNode.getDriverStrategy());
+ assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combineNode.getDriverStrategy());
+
+ // check the keys
+ assertEquals(new FieldList(0), reduceNode.getKeys(0));
+ assertEquals(new FieldList(0), combineNode.getKeys(0));
+ assertEquals(new FieldList(0), reduceNode.getInput().getLocalStrategyKeys());
+
+ // check DOP
+ assertEquals(6, sourceNode.getParallelism());
+ assertEquals(6, keyExtractor.getParallelism());
+ assertEquals(6, combineNode.getParallelism());
+
+ assertEquals(8, reduceNode.getParallelism());
+ assertEquals(8, keyProjector.getParallelism());
+ assertEquals(8, sinkNode.getParallelism());
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testDistinctWithFieldPositionKeyCombinable() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(8);
+
+ DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
+ .name("source").setParallelism(6);
+
+ DistinctOperator<Tuple2<String, Double>> reduced = data
+ .distinct(1).name("reducer");
+
+ reduced.print().name("sink");
+
+ Plan p = env.createProgramPlan();
+ OptimizedPlan op = compileNoStats(p);
+
+ OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
+
+ // get the original nodes
+ SourcePlanNode sourceNode = resolver.getNode("source");
+ SingleInputPlanNode reduceNode = resolver.getNode("reducer");
+ SinkPlanNode sinkNode = resolver.getNode("sink");
+
+ // get the combiner
+ SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getInput().getSource();
+
+ // check wiring
+ assertEquals(sourceNode, combineNode.getInput().getSource());
+ assertEquals(reduceNode, sinkNode.getInput().getSource());
+
+ // check that both reduce and combiner have the same strategy
+ assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reduceNode.getDriverStrategy());
+ assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combineNode.getDriverStrategy());
+
+ // check the keys
+ assertEquals(new FieldList(1), reduceNode.getKeys(0));
+ assertEquals(new FieldList(1), combineNode.getKeys(0));
+ assertEquals(new FieldList(1), reduceNode.getInput().getLocalStrategyKeys());
+
+ // check DOP
+ assertEquals(6, sourceNode.getParallelism());
+ assertEquals(6, combineNode.getParallelism());
+ assertEquals(8, reduceNode.getParallelism());
+ assertEquals(8, sinkNode.getParallelism());
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
+ }
+ }
+}
\ No newline at end of file