You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/20 11:06:47 UTC

[08/53] [abbrv] flink git commit: [optimizer] Rename optimizer project to "flink-optimizer" (previously flink-compiler)

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java
new file mode 100644
index 0000000..565d992
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics;
+import org.apache.flink.api.common.operators.GenericDataSourceBase;
+import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.common.operators.base.BulkIterationBase;
+import org.apache.flink.optimizer.costs.DefaultCostEstimator;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.PlanNode;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.Visitor;
+import org.junit.Before;
+
+/**
+ * Base class for Optimizer tests. Offers utility methods to trigger optimization
+ * of a program and to fetch the nodes in an optimizer plan that correspond
+ * the the node in the program plan.
+ */
+public abstract class CompilerTestBase implements java.io.Serializable {
+	
+	private static final long serialVersionUID = 1L;
+
+	protected static final String IN_FILE = OperatingSystem.isWindows() ? "file:/c:/" : "file:///dev/random";
+	
+	protected static final String OUT_FILE = OperatingSystem.isWindows() ? "file:/c:/" : "file:///dev/null";
+	
+	protected static final int DEFAULT_PARALLELISM = 8;
+	
+	private static final String CACHE_KEY = "cachekey";
+	
+	// ------------------------------------------------------------------------
+	
+	protected transient DataStatistics dataStats;
+	
+	protected transient Optimizer withStatsCompiler;
+	
+	protected transient Optimizer noStatsCompiler;
+	
+	private transient int statCounter;
+	
+	// ------------------------------------------------------------------------	
+	
+	@Before
+	public void setup() {
+		this.dataStats = new DataStatistics();
+		this.withStatsCompiler = new Optimizer(this.dataStats, new DefaultCostEstimator());
+		this.withStatsCompiler.setDefaultDegreeOfParallelism(DEFAULT_PARALLELISM);
+		
+		this.noStatsCompiler = new Optimizer(null, new DefaultCostEstimator());
+		this.noStatsCompiler.setDefaultDegreeOfParallelism(DEFAULT_PARALLELISM);
+	}
+	
+	// ------------------------------------------------------------------------
+	
+	public OptimizedPlan compileWithStats(Plan p) {
+		return this.withStatsCompiler.compile(p);
+	}
+	
+	public OptimizedPlan compileNoStats(Plan p) {
+		return this.noStatsCompiler.compile(p);
+	}
+	
+	public void setSourceStatistics(GenericDataSourceBase<?, ?> source, long size, float recordWidth) {
+		setSourceStatistics(source, new FileBaseStatistics(Long.MAX_VALUE, size, recordWidth));
+	}
+	
+	public void setSourceStatistics(GenericDataSourceBase<?, ?> source, FileBaseStatistics stats) {
+		final String key = CACHE_KEY + this.statCounter++;
+		this.dataStats.cacheBaseStatistics(stats, key);
+		source.setStatisticsKey(key);
+	}
+	
+	// ------------------------------------------------------------------------
+	
+	public static OptimizerPlanNodeResolver getOptimizerPlanNodeResolver(OptimizedPlan plan) {
+		return new OptimizerPlanNodeResolver(plan);
+	}
+	
+	public static final class OptimizerPlanNodeResolver {
+		
+		private final Map<String, ArrayList<PlanNode>> map;
+		
+		OptimizerPlanNodeResolver(OptimizedPlan p) {
+			HashMap<String, ArrayList<PlanNode>> map = new HashMap<String, ArrayList<PlanNode>>();
+			
+			for (PlanNode n : p.getAllNodes()) {
+				Operator<?> c = n.getOriginalOptimizerNode().getOperator();
+				String name = c.getName();
+				
+				ArrayList<PlanNode> list = map.get(name);
+				if (list == null) {
+					list = new ArrayList<PlanNode>(2);
+					map.put(name, list);
+				}
+				
+				// check whether this node is a child of a node with the same contract (aka combiner)
+				boolean shouldAdd = true;
+				for (Iterator<PlanNode> iter = list.iterator(); iter.hasNext();) {
+					PlanNode in = iter.next();
+					if (in.getOriginalOptimizerNode().getOperator() == c) {
+						// is this the child or is our node the child
+						if (in instanceof SingleInputPlanNode && n instanceof SingleInputPlanNode) {
+							SingleInputPlanNode thisNode = (SingleInputPlanNode) n;
+							SingleInputPlanNode otherNode = (SingleInputPlanNode) in;
+							
+							if (thisNode.getPredecessor() == otherNode) {
+								// other node is child, remove it
+								iter.remove();
+							} else if (otherNode.getPredecessor() == thisNode) {
+								shouldAdd = false;
+							}
+						} else {
+							throw new RuntimeException("Unrecodnized case in test.");
+						}
+					}
+				}
+				
+				if (shouldAdd) {
+					list.add(n);
+				}
+			}
+			
+			this.map = map;
+		}
+		
+		
+		@SuppressWarnings("unchecked")
+		public <T extends PlanNode> T getNode(String name) {
+			List<PlanNode> nodes = this.map.get(name);
+			if (nodes == null || nodes.isEmpty()) {
+				throw new RuntimeException("No node found with the given name.");
+			} else if (nodes.size() != 1) {
+				throw new RuntimeException("Multiple nodes found with the given name.");
+			} else {
+				return (T) nodes.get(0);
+			}
+		}
+		
+		@SuppressWarnings("unchecked")
+		public <T extends PlanNode> T getNode(String name, Class<? extends Function> stubClass) {
+			List<PlanNode> nodes = this.map.get(name);
+			if (nodes == null || nodes.isEmpty()) {
+				throw new RuntimeException("No node found with the given name and stub class.");
+			} else {
+				PlanNode found = null;
+				for (PlanNode node : nodes) {
+					if (node.getClass() == stubClass) {
+						if (found == null) {
+							found = node;
+						} else {
+							throw new RuntimeException("Multiple nodes found with the given name and stub class.");
+						}
+					}
+				}
+				if (found == null) {
+					throw new RuntimeException("No node found with the given name and stub class.");
+				} else {
+					return (T) found;
+				}
+			}
+		}
+		
+		public List<PlanNode> getNodes(String name) {
+			List<PlanNode> nodes = this.map.get(name);
+			if (nodes == null || nodes.isEmpty()) {
+				throw new RuntimeException("No node found with the given name.");
+			} else {
+				return new ArrayList<PlanNode>(nodes);
+			}
+		}
+	}
+
+	/**
+	 * Collects all DataSources of a plan to add statistics
+	 *
+	 */
+	public static class SourceCollectorVisitor implements Visitor<Operator<?>> {
+		
+		protected final List<GenericDataSourceBase<?, ?>> sources = new ArrayList<GenericDataSourceBase<?, ?>>(4);
+
+		@Override
+		public boolean preVisit(Operator<?> visitable) {
+			
+			if(visitable instanceof GenericDataSourceBase) {
+				sources.add((GenericDataSourceBase<?, ?>) visitable);
+			}
+			else if(visitable instanceof BulkIterationBase) {
+				((BulkIterationBase<?>) visitable).getNextPartialSolution().accept(this);
+			}
+			
+			return true;
+		}
+
+		@Override
+		public void postVisit(Operator<?> visitable) {}
+		
+		public List<GenericDataSourceBase<?, ?>> getSources() {
+			return this.sources;
+		}
+		
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DOPChangeTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DOPChangeTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DOPChangeTest.java
new file mode 100644
index 0000000..b17e777
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DOPChangeTest.java
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.optimizer;
+
+import org.junit.Assert;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.record.operators.FileDataSink;
+import org.apache.flink.api.java.record.operators.FileDataSource;
+import org.apache.flink.api.java.record.operators.JoinOperator;
+import org.apache.flink.api.java.record.operators.MapOperator;
+import org.apache.flink.api.java.record.operators.ReduceOperator;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.DualInputPlanNode;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.PlanNode;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.optimizer.util.DummyInputFormat;
+import org.apache.flink.optimizer.util.DummyMatchStub;
+import org.apache.flink.optimizer.util.DummyOutputFormat;
+import org.apache.flink.optimizer.util.IdentityMap;
+import org.apache.flink.optimizer.util.IdentityReduce;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.runtime.operators.util.LocalStrategy;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.util.Visitor;
+import org.junit.Test;
+
+/**
+ * Tests in this class:
+ * <ul>
+ *   <li>Tests that check the correct handling of the properties and strategies in the case where the degree of
+ *       parallelism between tasks is increased or decreased.
+ * </ul>
+ */
+@SuppressWarnings({"serial", "deprecation"})
+public class DOPChangeTest extends CompilerTestBase {
+	
+	/**
+	 * Simple Job: Map -> Reduce -> Map -> Reduce. All functions preserve all fields (hence all properties).
+	 * 
+	 * Increases DOP between 1st reduce and 2nd map, so the hash partitioning from 1st reduce is not reusable.
+	 * Expected to re-establish partitioning between reduce and map, via hash, because random is a full network
+	 * transit as well.
+	 */
+	@Test
+	public void checkPropertyHandlingWithIncreasingGlobalParallelism1() {
+		final int degOfPar = DEFAULT_PARALLELISM;
+		
+		// construct the plan
+		FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source");
+		source.setDegreeOfParallelism(degOfPar);
+		
+		MapOperator map1 = MapOperator.builder(new IdentityMap()).name("Map1").build();
+		map1.setDegreeOfParallelism(degOfPar);
+		map1.setInput(source);
+		
+		ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 1").build();
+		reduce1.setDegreeOfParallelism(degOfPar);
+		reduce1.setInput(map1);
+		
+		MapOperator map2 = MapOperator.builder(new IdentityMap()).name("Map2").build();
+		map2.setDegreeOfParallelism(degOfPar * 2);
+		map2.setInput(reduce1);
+		
+		ReduceOperator reduce2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 2").build();
+		reduce2.setDegreeOfParallelism(degOfPar * 2);
+		reduce2.setInput(map2);
+		
+		FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink");
+		sink.setDegreeOfParallelism(degOfPar * 2);
+		sink.setInput(reduce2);
+		
+		Plan plan = new Plan(sink, "Test Increasing Degree Of Parallelism");
+		
+		// submit the plan to the compiler
+		OptimizedPlan oPlan = compileNoStats(plan);
+		
+		// check the optimized Plan
+		// when reducer 1 distributes its data across the instances of map2, it needs to employ a local hash method,
+		// because map2 has twice as many instances and key/value pairs with the same key need to be processed by the same
+		// mapper respectively reducer
+		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+		SingleInputPlanNode red2Node = (SingleInputPlanNode) sinkNode.getPredecessor();
+		SingleInputPlanNode map2Node = (SingleInputPlanNode) red2Node.getPredecessor();
+		
+		ShipStrategyType mapIn = map2Node.getInput().getShipStrategy();
+		ShipStrategyType redIn = red2Node.getInput().getShipStrategy();
+		
+		Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.PARTITION_HASH, mapIn);
+		Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, redIn);
+	}
+	
+	/**
+	 * Simple Job: Map -> Reduce -> Map -> Reduce. All functions preserve all fields (hence all properties).
+	 * 
+	 * Increases DOP between 2nd map and 2nd reduce, so the hash partitioning from 1st reduce is not reusable.
+	 * Expected to re-establish partitioning between map and reduce (hash).
+	 */
+	@Test
+	public void checkPropertyHandlingWithIncreasingGlobalParallelism2() {
+		final int degOfPar = DEFAULT_PARALLELISM;
+		
+		// construct the plan
+		FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source");
+		source.setDegreeOfParallelism(degOfPar);
+		
+		MapOperator map1 = MapOperator.builder(new IdentityMap()).name("Map1").build();
+		map1.setDegreeOfParallelism(degOfPar);
+		map1.setInput(source);
+		
+		ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 1").build();
+		reduce1.setDegreeOfParallelism(degOfPar);
+		reduce1.setInput(map1);
+		
+		MapOperator map2 = MapOperator.builder(new IdentityMap()).name("Map2").build();
+		map2.setDegreeOfParallelism(degOfPar);
+		map2.setInput(reduce1);
+		
+		ReduceOperator reduce2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 2").build();
+		reduce2.setDegreeOfParallelism(degOfPar * 2);
+		reduce2.setInput(map2);
+		
+		FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink");
+		sink.setDegreeOfParallelism(degOfPar * 2);
+		sink.setInput(reduce2);
+		
+		Plan plan = new Plan(sink, "Test Increasing Degree Of Parallelism");
+		
+		// submit the plan to the compiler
+		OptimizedPlan oPlan = compileNoStats(plan);
+		
+		// check the optimized Plan
+		// when reducer 1 distributes its data across the instances of map2, it needs to employ a local hash method,
+		// because map2 has twice as many instances and key/value pairs with the same key need to be processed by the same
+		// mapper respectively reducer
+		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+		SingleInputPlanNode red2Node = (SingleInputPlanNode) sinkNode.getPredecessor();
+		SingleInputPlanNode map2Node = (SingleInputPlanNode) red2Node.getPredecessor();
+		
+		ShipStrategyType mapIn = map2Node.getInput().getShipStrategy();
+		ShipStrategyType reduceIn = red2Node.getInput().getShipStrategy();
+		
+		Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, mapIn);
+		Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.PARTITION_HASH, reduceIn);
+	}
+	
+	/**
+	 * Simple Job: Map -> Reduce -> Map -> Reduce. All functions preserve all fields (hence all properties).
+	 * 
+	 * Increases DOP between 1st reduce and 2nd map, such that more tasks are on one instance.
+	 * Expected to re-establish partitioning between map and reduce via a local hash.
+	 */
+	@Test
+	public void checkPropertyHandlingWithIncreasingLocalParallelism() {
+		final int degOfPar = 2 * DEFAULT_PARALLELISM;
+		
+		// construct the plan
+		FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source");
+		source.setDegreeOfParallelism(degOfPar);
+		
+		MapOperator map1 = MapOperator.builder(new IdentityMap()).name("Map1").build();
+		map1.setDegreeOfParallelism(degOfPar);
+		map1.setInput(source);
+		
+		ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 1").build();
+		reduce1.setDegreeOfParallelism(degOfPar);
+		reduce1.setInput(map1);
+		
+		MapOperator map2 = MapOperator.builder(new IdentityMap()).name("Map2").build();
+		map2.setDegreeOfParallelism(degOfPar * 2);
+		map2.setInput(reduce1);
+		
+		ReduceOperator reduce2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 2").build();
+		reduce2.setDegreeOfParallelism(degOfPar * 2);
+		reduce2.setInput(map2);
+		
+		FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink");
+		sink.setDegreeOfParallelism(degOfPar * 2);
+		sink.setInput(reduce2);
+		
+		Plan plan = new Plan(sink, "Test Increasing Degree Of Parallelism");
+		
+		// submit the plan to the compiler
+		OptimizedPlan oPlan = compileNoStats(plan);
+		
+		// check the optimized Plan
+		// when reducer 1 distributes its data across the instances of map2, it needs to employ a local hash method,
+		// because map2 has twice as many instances and key/value pairs with the same key need to be processed by the same
+		// mapper respectively reducer
+		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+		SingleInputPlanNode red2Node = (SingleInputPlanNode) sinkNode.getPredecessor();
+		SingleInputPlanNode map2Node = (SingleInputPlanNode) red2Node.getPredecessor();
+		
+		ShipStrategyType mapIn = map2Node.getInput().getShipStrategy();
+		ShipStrategyType reduceIn = red2Node.getInput().getShipStrategy();
+		
+		Assert.assertTrue("Invalid ship strategy for an operator.", 
+				(ShipStrategyType.PARTITION_RANDOM ==  mapIn && ShipStrategyType.PARTITION_HASH == reduceIn) || 
+				(ShipStrategyType.PARTITION_HASH == mapIn && ShipStrategyType.FORWARD == reduceIn));
+	}
+	
+	
+	
+	@Test
+	public void checkPropertyHandlingWithDecreasingDegreeOfParallelism() {
+		final int degOfPar = DEFAULT_PARALLELISM;
+		
+		// construct the plan
+		FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source");
+		source.setDegreeOfParallelism(degOfPar * 2);
+		
+		MapOperator map1 = MapOperator.builder(new IdentityMap()).name("Map1").build();
+		map1.setDegreeOfParallelism(degOfPar * 2);
+		map1.setInput(source);
+		
+		ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 1").build();
+		reduce1.setDegreeOfParallelism(degOfPar * 2);
+		reduce1.setInput(map1);
+		
+		MapOperator map2 = MapOperator.builder(new IdentityMap()).name("Map2").build();
+		map2.setDegreeOfParallelism(degOfPar);
+		map2.setInput(reduce1);
+		
+		ReduceOperator reduce2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 2").build();
+		reduce2.setDegreeOfParallelism(degOfPar);
+		reduce2.setInput(map2);
+		
+		FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink");
+		sink.setDegreeOfParallelism(degOfPar);
+		sink.setInput(reduce2);
+		
+		Plan plan = new Plan(sink, "Test Increasing Degree Of Parallelism");
+		
+		// submit the plan to the compiler
+		OptimizedPlan oPlan = compileNoStats(plan);
+
+		// check the optimized Plan
+		// when reducer 1 distributes its data across the instances of map2, it needs to employ a local hash method,
+		// because map2 has twice as many instances and key/value pairs with the same key need to be processed by the same
+		// mapper respectively reducer
+		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+		SingleInputPlanNode red2Node = (SingleInputPlanNode) sinkNode.getPredecessor();
+		SingleInputPlanNode map2Node = (SingleInputPlanNode) red2Node.getPredecessor();
+
+		Assert.assertTrue("The no sorting local strategy.",
+				LocalStrategy.SORT == red2Node.getInput().getLocalStrategy() ||
+						LocalStrategy.SORT == map2Node.getInput().getLocalStrategy());
+
+		Assert.assertTrue("The no partitioning ship strategy.",
+				ShipStrategyType.PARTITION_HASH == red2Node.getInput().getShipStrategy() ||
+						ShipStrategyType.PARTITION_HASH == map2Node.getInput().getShipStrategy());
+	}
+
+	/**
+	 * Checks that re-partitioning happens when the inputs of a two-input contract have different DOPs.
+	 * 
+	 * Test Plan:
+	 * <pre>
+	 * 
+	 * (source) -> reduce -\
+	 *                      Match -> (sink)
+	 * (source) -> reduce -/
+	 * 
+	 * </pre>
+	 * 
+	 */
+	@Test
+	public void checkPropertyHandlingWithTwoInputs() {
+		// construct the plan
+
+		FileDataSource sourceA = new FileDataSource(new DummyInputFormat(), IN_FILE);
+		FileDataSource sourceB = new FileDataSource(new DummyInputFormat(), IN_FILE);
+		
+		ReduceOperator redA = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0)
+			.input(sourceA)
+			.build();
+		ReduceOperator redB = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0)
+			.input(sourceB)
+			.build();
+		
+		JoinOperator mat = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0)
+			.input1(redA)
+			.input2(redB)
+			.build();
+		
+		FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, mat);
+		
+		sourceA.setDegreeOfParallelism(5);
+		sourceB.setDegreeOfParallelism(7);
+		redA.setDegreeOfParallelism(5);
+		redB.setDegreeOfParallelism(7);
+		
+		mat.setDegreeOfParallelism(5);
+		
+		sink.setDegreeOfParallelism(5);
+		
+		
+		// return the PACT plan
+		Plan plan = new Plan(sink, "Partition on DoP Change");
+		
+		OptimizedPlan oPlan = compileNoStats(plan);
+		
+		JobGraphGenerator jobGen = new JobGraphGenerator();
+		
+		//Compile plan to verify that no error is thrown
+		jobGen.compileJobGraph(oPlan);
+		
+		oPlan.accept(new Visitor<PlanNode>() {
+			
+			@Override
+			public boolean preVisit(PlanNode visitable) {
+				if (visitable instanceof DualInputPlanNode) {
+					DualInputPlanNode node = (DualInputPlanNode) visitable;
+					Channel c1 = node.getInput1();
+					Channel c2 = node.getInput2();
+					
+					Assert.assertEquals("Incompatible shipping strategy chosen for match", ShipStrategyType.FORWARD, c1.getShipStrategy());
+					Assert.assertEquals("Incompatible shipping strategy chosen for match", ShipStrategyType.PARTITION_HASH, c2.getShipStrategy());
+					return false;
+				}
+				return true;
+			}
+			
+			@Override
+			public void postVisit(PlanNode visitable) {
+				// DO NOTHING
+			}
+		});
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java
new file mode 100644
index 0000000..aaee975
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class DisjointDataFlowsTest extends CompilerTestBase {
+
+	@Test
+	public void testDisjointFlows() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			// generate two different flows
+			env.generateSequence(1, 10).print();
+			env.generateSequence(1, 10).print();
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			new JobGraphGenerator().compileJobGraph(op);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
new file mode 100644
index 0000000..34aa9f8
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.optimizer;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.operators.DistinctOperator;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.optimizer.plan.SourcePlanNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+
+@SuppressWarnings("serial")
+public class DistinctCompilationTest extends CompilerTestBase implements java.io.Serializable {
+
+	@Test
+	public void testDistinctPlain() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setDegreeOfParallelism(8);
+
+			DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
+					.name("source").setParallelism(6);
+
+			data
+					.distinct().name("reducer")
+					.print().name("sink");
+
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+
+			OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
+
+			// get the original nodes
+			SourcePlanNode sourceNode = resolver.getNode("source");
+			SingleInputPlanNode reduceNode = resolver.getNode("reducer");
+			SinkPlanNode sinkNode = resolver.getNode("sink");
+
+			// get the combiner
+			SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getInput().getSource();
+
+			// check wiring
+			assertEquals(sourceNode, combineNode.getInput().getSource());
+			assertEquals(reduceNode, sinkNode.getInput().getSource());
+
+			// check that both reduce and combiner have the same strategy
+			assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reduceNode.getDriverStrategy());
+			assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combineNode.getDriverStrategy());
+
+			// check the keys
+			assertEquals(new FieldList(0, 1), reduceNode.getKeys(0));
+			assertEquals(new FieldList(0, 1), combineNode.getKeys(0));
+			assertEquals(new FieldList(0, 1), reduceNode.getInput().getLocalStrategyKeys());
+
+			// check DOP
+			assertEquals(6, sourceNode.getParallelism());
+			assertEquals(6, combineNode.getParallelism());
+			assertEquals(8, reduceNode.getParallelism());
+			assertEquals(8, sinkNode.getParallelism());
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
+		}
+	}
+
+	@Test
+	public void testDistinctWithSelectorFunctionKey() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setDegreeOfParallelism(8);
+
+			DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
+					.name("source").setParallelism(6);
+
+			data
+					.distinct(new KeySelector<Tuple2<String,Double>, String>() {
+						public String getKey(Tuple2<String, Double> value) { return value.f0; }
+					}).name("reducer")
+					.print().name("sink");
+
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+
+			OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
+
+			// get the original nodes
+			SourcePlanNode sourceNode = resolver.getNode("source");
+			SingleInputPlanNode reduceNode = resolver.getNode("reducer");
+			SinkPlanNode sinkNode = resolver.getNode("sink");
+
+			// get the combiner
+			SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getInput().getSource();
+
+			// get the key extractors and projectors
+			SingleInputPlanNode keyExtractor = (SingleInputPlanNode) combineNode.getInput().getSource();
+			SingleInputPlanNode keyProjector = (SingleInputPlanNode) sinkNode.getInput().getSource();
+
+			// check wiring
+			assertEquals(sourceNode, keyExtractor.getInput().getSource());
+			assertEquals(keyProjector, sinkNode.getInput().getSource());
+
+			// check that both reduce and combiner have the same strategy
+			assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reduceNode.getDriverStrategy());
+			assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combineNode.getDriverStrategy());
+
+			// check the keys
+			assertEquals(new FieldList(0), reduceNode.getKeys(0));
+			assertEquals(new FieldList(0), combineNode.getKeys(0));
+			assertEquals(new FieldList(0), reduceNode.getInput().getLocalStrategyKeys());
+
+			// check DOP
+			assertEquals(6, sourceNode.getParallelism());
+			assertEquals(6, keyExtractor.getParallelism());
+			assertEquals(6, combineNode.getParallelism());
+
+			assertEquals(8, reduceNode.getParallelism());
+			assertEquals(8, keyProjector.getParallelism());
+			assertEquals(8, sinkNode.getParallelism());
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
+		}
+	}
+
+	@Test
+	public void testDistinctWithFieldPositionKeyCombinable() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setDegreeOfParallelism(8);
+
+			DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
+					.name("source").setParallelism(6);
+
+			DistinctOperator<Tuple2<String, Double>> reduced = data
+					.distinct(1).name("reducer");
+
+			reduced.print().name("sink");
+
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+
+			OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
+
+			// get the original nodes
+			SourcePlanNode sourceNode = resolver.getNode("source");
+			SingleInputPlanNode reduceNode = resolver.getNode("reducer");
+			SinkPlanNode sinkNode = resolver.getNode("sink");
+
+			// get the combiner
+			SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getInput().getSource();
+
+			// check wiring
+			assertEquals(sourceNode, combineNode.getInput().getSource());
+			assertEquals(reduceNode, sinkNode.getInput().getSource());
+
+			// check that both reduce and combiner have the same strategy
+			assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reduceNode.getDriverStrategy());
+			assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combineNode.getDriverStrategy());
+
+			// check the keys
+			assertEquals(new FieldList(1), reduceNode.getKeys(0));
+			assertEquals(new FieldList(1), combineNode.getKeys(0));
+			assertEquals(new FieldList(1), reduceNode.getInput().getLocalStrategyKeys());
+
+			// check DOP
+			assertEquals(6, sourceNode.getParallelism());
+			assertEquals(6, combineNode.getParallelism());
+			assertEquals(8, reduceNode.getParallelism());
+			assertEquals(8, sinkNode.getParallelism());
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
+		}
+	}
+}
\ No newline at end of file