You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/20 11:07:07 UTC

[28/53] [abbrv] flink git commit: [optimizer] Rename optimizer project to "flink-optimizer" (previously flink-compiler)

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java
deleted file mode 100644
index 565d992..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics;
-import org.apache.flink.api.common.operators.GenericDataSourceBase;
-import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.common.operators.base.BulkIterationBase;
-import org.apache.flink.optimizer.costs.DefaultCostEstimator;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.PlanNode;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.util.OperatingSystem;
-import org.apache.flink.util.Visitor;
-import org.junit.Before;
-
-/**
- * Base class for Optimizer tests. Offers utility methods to trigger optimization
- * of a program and to fetch the nodes in an optimizer plan that correspond
- * the the node in the program plan.
- */
-public abstract class CompilerTestBase implements java.io.Serializable {
-	
-	private static final long serialVersionUID = 1L;
-
-	protected static final String IN_FILE = OperatingSystem.isWindows() ? "file:/c:/" : "file:///dev/random";
-	
-	protected static final String OUT_FILE = OperatingSystem.isWindows() ? "file:/c:/" : "file:///dev/null";
-	
-	protected static final int DEFAULT_PARALLELISM = 8;
-	
-	private static final String CACHE_KEY = "cachekey";
-	
-	// ------------------------------------------------------------------------
-	
-	protected transient DataStatistics dataStats;
-	
-	protected transient Optimizer withStatsCompiler;
-	
-	protected transient Optimizer noStatsCompiler;
-	
-	private transient int statCounter;
-	
-	// ------------------------------------------------------------------------	
-	
-	@Before
-	public void setup() {
-		this.dataStats = new DataStatistics();
-		this.withStatsCompiler = new Optimizer(this.dataStats, new DefaultCostEstimator());
-		this.withStatsCompiler.setDefaultDegreeOfParallelism(DEFAULT_PARALLELISM);
-		
-		this.noStatsCompiler = new Optimizer(null, new DefaultCostEstimator());
-		this.noStatsCompiler.setDefaultDegreeOfParallelism(DEFAULT_PARALLELISM);
-	}
-	
-	// ------------------------------------------------------------------------
-	
-	public OptimizedPlan compileWithStats(Plan p) {
-		return this.withStatsCompiler.compile(p);
-	}
-	
-	public OptimizedPlan compileNoStats(Plan p) {
-		return this.noStatsCompiler.compile(p);
-	}
-	
-	public void setSourceStatistics(GenericDataSourceBase<?, ?> source, long size, float recordWidth) {
-		setSourceStatistics(source, new FileBaseStatistics(Long.MAX_VALUE, size, recordWidth));
-	}
-	
-	public void setSourceStatistics(GenericDataSourceBase<?, ?> source, FileBaseStatistics stats) {
-		final String key = CACHE_KEY + this.statCounter++;
-		this.dataStats.cacheBaseStatistics(stats, key);
-		source.setStatisticsKey(key);
-	}
-	
-	// ------------------------------------------------------------------------
-	
-	public static OptimizerPlanNodeResolver getOptimizerPlanNodeResolver(OptimizedPlan plan) {
-		return new OptimizerPlanNodeResolver(plan);
-	}
-	
-	public static final class OptimizerPlanNodeResolver {
-		
-		private final Map<String, ArrayList<PlanNode>> map;
-		
-		OptimizerPlanNodeResolver(OptimizedPlan p) {
-			HashMap<String, ArrayList<PlanNode>> map = new HashMap<String, ArrayList<PlanNode>>();
-			
-			for (PlanNode n : p.getAllNodes()) {
-				Operator<?> c = n.getOriginalOptimizerNode().getOperator();
-				String name = c.getName();
-				
-				ArrayList<PlanNode> list = map.get(name);
-				if (list == null) {
-					list = new ArrayList<PlanNode>(2);
-					map.put(name, list);
-				}
-				
-				// check whether this node is a child of a node with the same contract (aka combiner)
-				boolean shouldAdd = true;
-				for (Iterator<PlanNode> iter = list.iterator(); iter.hasNext();) {
-					PlanNode in = iter.next();
-					if (in.getOriginalOptimizerNode().getOperator() == c) {
-						// is this the child or is our node the child
-						if (in instanceof SingleInputPlanNode && n instanceof SingleInputPlanNode) {
-							SingleInputPlanNode thisNode = (SingleInputPlanNode) n;
-							SingleInputPlanNode otherNode = (SingleInputPlanNode) in;
-							
-							if (thisNode.getPredecessor() == otherNode) {
-								// other node is child, remove it
-								iter.remove();
-							} else if (otherNode.getPredecessor() == thisNode) {
-								shouldAdd = false;
-							}
-						} else {
-							throw new RuntimeException("Unrecodnized case in test.");
-						}
-					}
-				}
-				
-				if (shouldAdd) {
-					list.add(n);
-				}
-			}
-			
-			this.map = map;
-		}
-		
-		
-		@SuppressWarnings("unchecked")
-		public <T extends PlanNode> T getNode(String name) {
-			List<PlanNode> nodes = this.map.get(name);
-			if (nodes == null || nodes.isEmpty()) {
-				throw new RuntimeException("No node found with the given name.");
-			} else if (nodes.size() != 1) {
-				throw new RuntimeException("Multiple nodes found with the given name.");
-			} else {
-				return (T) nodes.get(0);
-			}
-		}
-		
-		@SuppressWarnings("unchecked")
-		public <T extends PlanNode> T getNode(String name, Class<? extends Function> stubClass) {
-			List<PlanNode> nodes = this.map.get(name);
-			if (nodes == null || nodes.isEmpty()) {
-				throw new RuntimeException("No node found with the given name and stub class.");
-			} else {
-				PlanNode found = null;
-				for (PlanNode node : nodes) {
-					if (node.getClass() == stubClass) {
-						if (found == null) {
-							found = node;
-						} else {
-							throw new RuntimeException("Multiple nodes found with the given name and stub class.");
-						}
-					}
-				}
-				if (found == null) {
-					throw new RuntimeException("No node found with the given name and stub class.");
-				} else {
-					return (T) found;
-				}
-			}
-		}
-		
-		public List<PlanNode> getNodes(String name) {
-			List<PlanNode> nodes = this.map.get(name);
-			if (nodes == null || nodes.isEmpty()) {
-				throw new RuntimeException("No node found with the given name.");
-			} else {
-				return new ArrayList<PlanNode>(nodes);
-			}
-		}
-	}
-
-	/**
-	 * Collects all DataSources of a plan to add statistics
-	 *
-	 */
-	public static class SourceCollectorVisitor implements Visitor<Operator<?>> {
-		
-		protected final List<GenericDataSourceBase<?, ?>> sources = new ArrayList<GenericDataSourceBase<?, ?>>(4);
-
-		@Override
-		public boolean preVisit(Operator<?> visitable) {
-			
-			if(visitable instanceof GenericDataSourceBase) {
-				sources.add((GenericDataSourceBase<?, ?>) visitable);
-			}
-			else if(visitable instanceof BulkIterationBase) {
-				((BulkIterationBase<?>) visitable).getNextPartialSolution().accept(this);
-			}
-			
-			return true;
-		}
-
-		@Override
-		public void postVisit(Operator<?> visitable) {}
-		
-		public List<GenericDataSourceBase<?, ?>> getSources() {
-			return this.sources;
-		}
-		
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/DOPChangeTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/DOPChangeTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/DOPChangeTest.java
deleted file mode 100644
index b17e777..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/DOPChangeTest.java
+++ /dev/null
@@ -1,347 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.optimizer;
-
-import org.junit.Assert;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.JoinOperator;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.DualInputPlanNode;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.PlanNode;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.optimizer.util.DummyInputFormat;
-import org.apache.flink.optimizer.util.DummyMatchStub;
-import org.apache.flink.optimizer.util.DummyOutputFormat;
-import org.apache.flink.optimizer.util.IdentityMap;
-import org.apache.flink.optimizer.util.IdentityReduce;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.util.Visitor;
-import org.junit.Test;
-
-/**
- * Tests in this class:
- * <ul>
- *   <li>Tests that check the correct handling of the properties and strategies in the case where the degree of
- *       parallelism between tasks is increased or decreased.
- * </ul>
- */
-@SuppressWarnings({"serial", "deprecation"})
-public class DOPChangeTest extends CompilerTestBase {
-	
-	/**
-	 * Simple Job: Map -> Reduce -> Map -> Reduce. All functions preserve all fields (hence all properties).
-	 * 
-	 * Increases DOP between 1st reduce and 2nd map, so the hash partitioning from 1st reduce is not reusable.
-	 * Expected to re-establish partitioning between reduce and map, via hash, because random is a full network
-	 * transit as well.
-	 */
-	@Test
-	public void checkPropertyHandlingWithIncreasingGlobalParallelism1() {
-		final int degOfPar = DEFAULT_PARALLELISM;
-		
-		// construct the plan
-		FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source");
-		source.setDegreeOfParallelism(degOfPar);
-		
-		MapOperator map1 = MapOperator.builder(new IdentityMap()).name("Map1").build();
-		map1.setDegreeOfParallelism(degOfPar);
-		map1.setInput(source);
-		
-		ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 1").build();
-		reduce1.setDegreeOfParallelism(degOfPar);
-		reduce1.setInput(map1);
-		
-		MapOperator map2 = MapOperator.builder(new IdentityMap()).name("Map2").build();
-		map2.setDegreeOfParallelism(degOfPar * 2);
-		map2.setInput(reduce1);
-		
-		ReduceOperator reduce2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 2").build();
-		reduce2.setDegreeOfParallelism(degOfPar * 2);
-		reduce2.setInput(map2);
-		
-		FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink");
-		sink.setDegreeOfParallelism(degOfPar * 2);
-		sink.setInput(reduce2);
-		
-		Plan plan = new Plan(sink, "Test Increasing Degree Of Parallelism");
-		
-		// submit the plan to the compiler
-		OptimizedPlan oPlan = compileNoStats(plan);
-		
-		// check the optimized Plan
-		// when reducer 1 distributes its data across the instances of map2, it needs to employ a local hash method,
-		// because map2 has twice as many instances and key/value pairs with the same key need to be processed by the same
-		// mapper respectively reducer
-		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
-		SingleInputPlanNode red2Node = (SingleInputPlanNode) sinkNode.getPredecessor();
-		SingleInputPlanNode map2Node = (SingleInputPlanNode) red2Node.getPredecessor();
-		
-		ShipStrategyType mapIn = map2Node.getInput().getShipStrategy();
-		ShipStrategyType redIn = red2Node.getInput().getShipStrategy();
-		
-		Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.PARTITION_HASH, mapIn);
-		Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, redIn);
-	}
-	
-	/**
-	 * Simple Job: Map -> Reduce -> Map -> Reduce. All functions preserve all fields (hence all properties).
-	 * 
-	 * Increases DOP between 2nd map and 2nd reduce, so the hash partitioning from 1st reduce is not reusable.
-	 * Expected to re-establish partitioning between map and reduce (hash).
-	 */
-	@Test
-	public void checkPropertyHandlingWithIncreasingGlobalParallelism2() {
-		final int degOfPar = DEFAULT_PARALLELISM;
-		
-		// construct the plan
-		FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source");
-		source.setDegreeOfParallelism(degOfPar);
-		
-		MapOperator map1 = MapOperator.builder(new IdentityMap()).name("Map1").build();
-		map1.setDegreeOfParallelism(degOfPar);
-		map1.setInput(source);
-		
-		ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 1").build();
-		reduce1.setDegreeOfParallelism(degOfPar);
-		reduce1.setInput(map1);
-		
-		MapOperator map2 = MapOperator.builder(new IdentityMap()).name("Map2").build();
-		map2.setDegreeOfParallelism(degOfPar);
-		map2.setInput(reduce1);
-		
-		ReduceOperator reduce2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 2").build();
-		reduce2.setDegreeOfParallelism(degOfPar * 2);
-		reduce2.setInput(map2);
-		
-		FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink");
-		sink.setDegreeOfParallelism(degOfPar * 2);
-		sink.setInput(reduce2);
-		
-		Plan plan = new Plan(sink, "Test Increasing Degree Of Parallelism");
-		
-		// submit the plan to the compiler
-		OptimizedPlan oPlan = compileNoStats(plan);
-		
-		// check the optimized Plan
-		// when reducer 1 distributes its data across the instances of map2, it needs to employ a local hash method,
-		// because map2 has twice as many instances and key/value pairs with the same key need to be processed by the same
-		// mapper respectively reducer
-		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
-		SingleInputPlanNode red2Node = (SingleInputPlanNode) sinkNode.getPredecessor();
-		SingleInputPlanNode map2Node = (SingleInputPlanNode) red2Node.getPredecessor();
-		
-		ShipStrategyType mapIn = map2Node.getInput().getShipStrategy();
-		ShipStrategyType reduceIn = red2Node.getInput().getShipStrategy();
-		
-		Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, mapIn);
-		Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.PARTITION_HASH, reduceIn);
-	}
-	
-	/**
-	 * Simple Job: Map -> Reduce -> Map -> Reduce. All functions preserve all fields (hence all properties).
-	 * 
-	 * Increases DOP between 1st reduce and 2nd map, such that more tasks are on one instance.
-	 * Expected to re-establish partitioning between map and reduce via a local hash.
-	 */
-	@Test
-	public void checkPropertyHandlingWithIncreasingLocalParallelism() {
-		final int degOfPar = 2 * DEFAULT_PARALLELISM;
-		
-		// construct the plan
-		FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source");
-		source.setDegreeOfParallelism(degOfPar);
-		
-		MapOperator map1 = MapOperator.builder(new IdentityMap()).name("Map1").build();
-		map1.setDegreeOfParallelism(degOfPar);
-		map1.setInput(source);
-		
-		ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 1").build();
-		reduce1.setDegreeOfParallelism(degOfPar);
-		reduce1.setInput(map1);
-		
-		MapOperator map2 = MapOperator.builder(new IdentityMap()).name("Map2").build();
-		map2.setDegreeOfParallelism(degOfPar * 2);
-		map2.setInput(reduce1);
-		
-		ReduceOperator reduce2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 2").build();
-		reduce2.setDegreeOfParallelism(degOfPar * 2);
-		reduce2.setInput(map2);
-		
-		FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink");
-		sink.setDegreeOfParallelism(degOfPar * 2);
-		sink.setInput(reduce2);
-		
-		Plan plan = new Plan(sink, "Test Increasing Degree Of Parallelism");
-		
-		// submit the plan to the compiler
-		OptimizedPlan oPlan = compileNoStats(plan);
-		
-		// check the optimized Plan
-		// when reducer 1 distributes its data across the instances of map2, it needs to employ a local hash method,
-		// because map2 has twice as many instances and key/value pairs with the same key need to be processed by the same
-		// mapper respectively reducer
-		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
-		SingleInputPlanNode red2Node = (SingleInputPlanNode) sinkNode.getPredecessor();
-		SingleInputPlanNode map2Node = (SingleInputPlanNode) red2Node.getPredecessor();
-		
-		ShipStrategyType mapIn = map2Node.getInput().getShipStrategy();
-		ShipStrategyType reduceIn = red2Node.getInput().getShipStrategy();
-		
-		Assert.assertTrue("Invalid ship strategy for an operator.", 
-				(ShipStrategyType.PARTITION_RANDOM ==  mapIn && ShipStrategyType.PARTITION_HASH == reduceIn) || 
-				(ShipStrategyType.PARTITION_HASH == mapIn && ShipStrategyType.FORWARD == reduceIn));
-	}
-	
-	
-	
-	@Test
-	public void checkPropertyHandlingWithDecreasingDegreeOfParallelism() {
-		final int degOfPar = DEFAULT_PARALLELISM;
-		
-		// construct the plan
-		FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source");
-		source.setDegreeOfParallelism(degOfPar * 2);
-		
-		MapOperator map1 = MapOperator.builder(new IdentityMap()).name("Map1").build();
-		map1.setDegreeOfParallelism(degOfPar * 2);
-		map1.setInput(source);
-		
-		ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 1").build();
-		reduce1.setDegreeOfParallelism(degOfPar * 2);
-		reduce1.setInput(map1);
-		
-		MapOperator map2 = MapOperator.builder(new IdentityMap()).name("Map2").build();
-		map2.setDegreeOfParallelism(degOfPar);
-		map2.setInput(reduce1);
-		
-		ReduceOperator reduce2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 2").build();
-		reduce2.setDegreeOfParallelism(degOfPar);
-		reduce2.setInput(map2);
-		
-		FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink");
-		sink.setDegreeOfParallelism(degOfPar);
-		sink.setInput(reduce2);
-		
-		Plan plan = new Plan(sink, "Test Increasing Degree Of Parallelism");
-		
-		// submit the plan to the compiler
-		OptimizedPlan oPlan = compileNoStats(plan);
-
-		// check the optimized Plan
-		// when reducer 1 distributes its data across the instances of map2, it needs to employ a local hash method,
-		// because map2 has twice as many instances and key/value pairs with the same key need to be processed by the same
-		// mapper respectively reducer
-		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
-		SingleInputPlanNode red2Node = (SingleInputPlanNode) sinkNode.getPredecessor();
-		SingleInputPlanNode map2Node = (SingleInputPlanNode) red2Node.getPredecessor();
-
-		Assert.assertTrue("The no sorting local strategy.",
-				LocalStrategy.SORT == red2Node.getInput().getLocalStrategy() ||
-						LocalStrategy.SORT == map2Node.getInput().getLocalStrategy());
-
-		Assert.assertTrue("The no partitioning ship strategy.",
-				ShipStrategyType.PARTITION_HASH == red2Node.getInput().getShipStrategy() ||
-						ShipStrategyType.PARTITION_HASH == map2Node.getInput().getShipStrategy());
-	}
-
-	/**
-	 * Checks that re-partitioning happens when the inputs of a two-input contract have different DOPs.
-	 * 
-	 * Test Plan:
-	 * <pre>
-	 * 
-	 * (source) -> reduce -\
-	 *                      Match -> (sink)
-	 * (source) -> reduce -/
-	 * 
-	 * </pre>
-	 * 
-	 */
-	@Test
-	public void checkPropertyHandlingWithTwoInputs() {
-		// construct the plan
-
-		FileDataSource sourceA = new FileDataSource(new DummyInputFormat(), IN_FILE);
-		FileDataSource sourceB = new FileDataSource(new DummyInputFormat(), IN_FILE);
-		
-		ReduceOperator redA = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0)
-			.input(sourceA)
-			.build();
-		ReduceOperator redB = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0)
-			.input(sourceB)
-			.build();
-		
-		JoinOperator mat = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0)
-			.input1(redA)
-			.input2(redB)
-			.build();
-		
-		FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, mat);
-		
-		sourceA.setDegreeOfParallelism(5);
-		sourceB.setDegreeOfParallelism(7);
-		redA.setDegreeOfParallelism(5);
-		redB.setDegreeOfParallelism(7);
-		
-		mat.setDegreeOfParallelism(5);
-		
-		sink.setDegreeOfParallelism(5);
-		
-		
-		// return the PACT plan
-		Plan plan = new Plan(sink, "Partition on DoP Change");
-		
-		OptimizedPlan oPlan = compileNoStats(plan);
-		
-		JobGraphGenerator jobGen = new JobGraphGenerator();
-		
-		//Compile plan to verify that no error is thrown
-		jobGen.compileJobGraph(oPlan);
-		
-		oPlan.accept(new Visitor<PlanNode>() {
-			
-			@Override
-			public boolean preVisit(PlanNode visitable) {
-				if (visitable instanceof DualInputPlanNode) {
-					DualInputPlanNode node = (DualInputPlanNode) visitable;
-					Channel c1 = node.getInput1();
-					Channel c2 = node.getInput2();
-					
-					Assert.assertEquals("Incompatible shipping strategy chosen for match", ShipStrategyType.FORWARD, c1.getShipStrategy());
-					Assert.assertEquals("Incompatible shipping strategy chosen for match", ShipStrategyType.PARTITION_HASH, c2.getShipStrategy());
-					return false;
-				}
-				return true;
-			}
-			
-			@Override
-			public void postVisit(PlanNode visitable) {
-				// DO NOTHING
-			}
-		});
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java
deleted file mode 100644
index aaee975..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.junit.Test;
-
-@SuppressWarnings("serial")
-public class DisjointDataFlowsTest extends CompilerTestBase {
-
-	@Test
-	public void testDisjointFlows() {
-		try {
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			
-			// generate two different flows
-			env.generateSequence(1, 10).print();
-			env.generateSequence(1, 10).print();
-			
-			Plan p = env.createProgramPlan();
-			OptimizedPlan op = compileNoStats(p);
-			
-			new JobGraphGenerator().compileJobGraph(op);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
deleted file mode 100644
index 34aa9f8..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.optimizer;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.operators.DistinctOperator;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.plan.SourcePlanNode;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-
-@SuppressWarnings("serial")
-public class DistinctCompilationTest extends CompilerTestBase implements java.io.Serializable {
-
-	@Test
-	public void testDistinctPlain() {
-		try {
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(8);
-
-			DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
-					.name("source").setParallelism(6);
-
-			data
-					.distinct().name("reducer")
-					.print().name("sink");
-
-			Plan p = env.createProgramPlan();
-			OptimizedPlan op = compileNoStats(p);
-
-			OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
-
-			// get the original nodes
-			SourcePlanNode sourceNode = resolver.getNode("source");
-			SingleInputPlanNode reduceNode = resolver.getNode("reducer");
-			SinkPlanNode sinkNode = resolver.getNode("sink");
-
-			// get the combiner
-			SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getInput().getSource();
-
-			// check wiring
-			assertEquals(sourceNode, combineNode.getInput().getSource());
-			assertEquals(reduceNode, sinkNode.getInput().getSource());
-
-			// check that both reduce and combiner have the same strategy
-			assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reduceNode.getDriverStrategy());
-			assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combineNode.getDriverStrategy());
-
-			// check the keys
-			assertEquals(new FieldList(0, 1), reduceNode.getKeys(0));
-			assertEquals(new FieldList(0, 1), combineNode.getKeys(0));
-			assertEquals(new FieldList(0, 1), reduceNode.getInput().getLocalStrategyKeys());
-
-			// check DOP
-			assertEquals(6, sourceNode.getParallelism());
-			assertEquals(6, combineNode.getParallelism());
-			assertEquals(8, reduceNode.getParallelism());
-			assertEquals(8, sinkNode.getParallelism());
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
-		}
-	}
-
-	@Test
-	public void testDistinctWithSelectorFunctionKey() {
-		try {
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(8);
-
-			DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
-					.name("source").setParallelism(6);
-
-			data
-					.distinct(new KeySelector<Tuple2<String,Double>, String>() {
-						public String getKey(Tuple2<String, Double> value) { return value.f0; }
-					}).name("reducer")
-					.print().name("sink");
-
-			Plan p = env.createProgramPlan();
-			OptimizedPlan op = compileNoStats(p);
-
-			OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
-
-			// get the original nodes
-			SourcePlanNode sourceNode = resolver.getNode("source");
-			SingleInputPlanNode reduceNode = resolver.getNode("reducer");
-			SinkPlanNode sinkNode = resolver.getNode("sink");
-
-			// get the combiner
-			SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getInput().getSource();
-
-			// get the key extractors and projectors
-			SingleInputPlanNode keyExtractor = (SingleInputPlanNode) combineNode.getInput().getSource();
-			SingleInputPlanNode keyProjector = (SingleInputPlanNode) sinkNode.getInput().getSource();
-
-			// check wiring
-			assertEquals(sourceNode, keyExtractor.getInput().getSource());
-			assertEquals(keyProjector, sinkNode.getInput().getSource());
-
-			// check that both reduce and combiner have the same strategy
-			assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reduceNode.getDriverStrategy());
-			assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combineNode.getDriverStrategy());
-
-			// check the keys
-			assertEquals(new FieldList(0), reduceNode.getKeys(0));
-			assertEquals(new FieldList(0), combineNode.getKeys(0));
-			assertEquals(new FieldList(0), reduceNode.getInput().getLocalStrategyKeys());
-
-			// check DOP
-			assertEquals(6, sourceNode.getParallelism());
-			assertEquals(6, keyExtractor.getParallelism());
-			assertEquals(6, combineNode.getParallelism());
-
-			assertEquals(8, reduceNode.getParallelism());
-			assertEquals(8, keyProjector.getParallelism());
-			assertEquals(8, sinkNode.getParallelism());
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
-		}
-	}
-
-	@Test
-	public void testDistinctWithFieldPositionKeyCombinable() {
-		try {
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(8);
-
-			DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
-					.name("source").setParallelism(6);
-
-			DistinctOperator<Tuple2<String, Double>> reduced = data
-					.distinct(1).name("reducer");
-
-			reduced.print().name("sink");
-
-			Plan p = env.createProgramPlan();
-			OptimizedPlan op = compileNoStats(p);
-
-			OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
-
-			// get the original nodes
-			SourcePlanNode sourceNode = resolver.getNode("source");
-			SingleInputPlanNode reduceNode = resolver.getNode("reducer");
-			SinkPlanNode sinkNode = resolver.getNode("sink");
-
-			// get the combiner
-			SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getInput().getSource();
-
-			// check wiring
-			assertEquals(sourceNode, combineNode.getInput().getSource());
-			assertEquals(reduceNode, sinkNode.getInput().getSource());
-
-			// check that both reduce and combiner have the same strategy
-			assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reduceNode.getDriverStrategy());
-			assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combineNode.getDriverStrategy());
-
-			// check the keys
-			assertEquals(new FieldList(1), reduceNode.getKeys(0));
-			assertEquals(new FieldList(1), combineNode.getKeys(0));
-			assertEquals(new FieldList(1), reduceNode.getInput().getLocalStrategyKeys());
-
-			// check DOP
-			assertEquals(6, sourceNode.getParallelism());
-			assertEquals(6, combineNode.getParallelism());
-			assertEquals(8, reduceNode.getParallelism());
-			assertEquals(8, sinkNode.getParallelism());
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
-		}
-	}
-}
\ No newline at end of file