You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/20 11:07:00 UTC

[21/53] [abbrv] flink git commit: [optimizer] Rename optimizer project to "flink-optimizer" (previously flink-compiler)

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java
deleted file mode 100644
index 8720aa7..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java
+++ /dev/null
@@ -1,302 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.java;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import org.apache.flink.api.common.InvalidProgramException;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.operators.DeltaIteration;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.common.functions.RichGroupReduceFunction;
-import org.apache.flink.api.common.functions.RichJoinFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.optimizer.CompilerTestBase;
-import org.apache.flink.optimizer.plan.DualInputPlanNode;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-
-/**
-* Tests that validate optimizer choices when using operators that are requesting certain specific execution
-* strategies.
-*/
-@SuppressWarnings("serial")
-public class WorksetIterationsJavaApiCompilerTest extends CompilerTestBase {
-	
-	private static final String JOIN_WITH_INVARIANT_NAME = "Test Join Invariant";
-	private static final String JOIN_WITH_SOLUTION_SET = "Test Join SolutionSet";
-	private static final String NEXT_WORKSET_REDUCER_NAME = "Test Reduce Workset";
-	private static final String SOLUTION_DELTA_MAPPER_NAME = "Test Map Delta";
-
-	@Test
-	public void testJavaApiWithDeferredSoltionSetUpdateWithMapper() {
-		try {
-			Plan plan = getJavaTestPlan(false, true);
-			
-			OptimizedPlan oPlan = compileNoStats(plan);
-	
-			OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(oPlan);
-			DualInputPlanNode joinWithInvariantNode = resolver.getNode(JOIN_WITH_INVARIANT_NAME);
-			DualInputPlanNode joinWithSolutionSetNode = resolver.getNode(JOIN_WITH_SOLUTION_SET);
-			SingleInputPlanNode worksetReducer = resolver.getNode(NEXT_WORKSET_REDUCER_NAME);
-			SingleInputPlanNode deltaMapper = resolver.getNode(SOLUTION_DELTA_MAPPER_NAME);
-			
-			// iteration preserves partitioning in reducer, so the first partitioning is out of the loop, 
-			// the in-loop partitioning is before the final reducer
-			
-			// verify joinWithInvariant
-			assertEquals(ShipStrategyType.FORWARD, joinWithInvariantNode.getInput1().getShipStrategy()); 
-			assertEquals(ShipStrategyType.PARTITION_HASH, joinWithInvariantNode.getInput2().getShipStrategy());
-			assertEquals(new FieldList(1, 2), joinWithInvariantNode.getKeysForInput1());
-			assertEquals(new FieldList(1, 2), joinWithInvariantNode.getKeysForInput2());
-			
-			// verify joinWithSolutionSet
-			assertEquals(ShipStrategyType.PARTITION_HASH, joinWithSolutionSetNode.getInput1().getShipStrategy());
-			assertEquals(ShipStrategyType.FORWARD, joinWithSolutionSetNode.getInput2().getShipStrategy());
-			assertEquals(new FieldList(1, 0), joinWithSolutionSetNode.getKeysForInput1());
-			
-			
-			// verify reducer
-			assertEquals(ShipStrategyType.PARTITION_HASH, worksetReducer.getInput().getShipStrategy());
-			assertEquals(new FieldList(1, 2), worksetReducer.getKeys(0));
-			
-			// currently, the system may partition before or after the mapper
-			ShipStrategyType ss1 = deltaMapper.getInput().getShipStrategy();
-			ShipStrategyType ss2 = deltaMapper.getOutgoingChannels().get(0).getShipStrategy();
-			
-			assertTrue( (ss1 == ShipStrategyType.FORWARD && ss2 == ShipStrategyType.PARTITION_HASH) ||
-						(ss2 == ShipStrategyType.FORWARD && ss1 == ShipStrategyType.PARTITION_HASH) );
-		
-			new JobGraphGenerator().compileJobGraph(oPlan);
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			fail("Test errored: " + e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testJavaApiWithDeferredSoltionSetUpdateWithNonPreservingJoin() {
-		try {
-			Plan plan = getJavaTestPlan(false, false);
-			
-			OptimizedPlan oPlan = compileNoStats(plan);
-			
-			OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(oPlan);
-			DualInputPlanNode joinWithInvariantNode = resolver.getNode(JOIN_WITH_INVARIANT_NAME);
-			DualInputPlanNode joinWithSolutionSetNode = resolver.getNode(JOIN_WITH_SOLUTION_SET);
-			SingleInputPlanNode worksetReducer = resolver.getNode(NEXT_WORKSET_REDUCER_NAME);
-			
-			// iteration preserves partitioning in reducer, so the first partitioning is out of the loop, 
-			// the in-loop partitioning is before the final reducer
-			
-			// verify joinWithInvariant
-			assertEquals(ShipStrategyType.FORWARD, joinWithInvariantNode.getInput1().getShipStrategy()); 
-			assertEquals(ShipStrategyType.PARTITION_HASH, joinWithInvariantNode.getInput2().getShipStrategy());
-			assertEquals(new FieldList(1, 2), joinWithInvariantNode.getKeysForInput1());
-			assertEquals(new FieldList(1, 2), joinWithInvariantNode.getKeysForInput2());
-			
-			// verify joinWithSolutionSet
-			assertEquals(ShipStrategyType.PARTITION_HASH, joinWithSolutionSetNode.getInput1().getShipStrategy());
-			assertEquals(ShipStrategyType.FORWARD, joinWithSolutionSetNode.getInput2().getShipStrategy());
-			assertEquals(new FieldList(1, 0), joinWithSolutionSetNode.getKeysForInput1());
-			
-			// verify reducer
-			assertEquals(ShipStrategyType.PARTITION_HASH, worksetReducer.getInput().getShipStrategy());
-			assertEquals(new FieldList(1, 2), worksetReducer.getKeys(0));
-			
-			// verify solution delta
-			assertEquals(2, joinWithSolutionSetNode.getOutgoingChannels().size());
-			assertEquals(ShipStrategyType.PARTITION_HASH, joinWithSolutionSetNode.getOutgoingChannels().get(0).getShipStrategy());
-			assertEquals(ShipStrategyType.PARTITION_HASH, joinWithSolutionSetNode.getOutgoingChannels().get(1).getShipStrategy());
-			
-			new JobGraphGenerator().compileJobGraph(oPlan);
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			fail("Test errored: " + e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testJavaApiWithDirectSoltionSetUpdate() {
-		try {
-			Plan plan = getJavaTestPlan(true, false);
-			
-			OptimizedPlan oPlan = compileNoStats(plan);
-	
-			
-			OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(oPlan);
-			DualInputPlanNode joinWithInvariantNode = resolver.getNode(JOIN_WITH_INVARIANT_NAME);
-			DualInputPlanNode joinWithSolutionSetNode = resolver.getNode(JOIN_WITH_SOLUTION_SET);
-			SingleInputPlanNode worksetReducer = resolver.getNode(NEXT_WORKSET_REDUCER_NAME);
-			
-			// iteration preserves partitioning in reducer, so the first partitioning is out of the loop, 
-			// the in-loop partitioning is before the final reducer
-			
-			// verify joinWithInvariant
-			assertEquals(ShipStrategyType.FORWARD, joinWithInvariantNode.getInput1().getShipStrategy()); 
-			assertEquals(ShipStrategyType.PARTITION_HASH, joinWithInvariantNode.getInput2().getShipStrategy());
-			assertEquals(new FieldList(1, 2), joinWithInvariantNode.getKeysForInput1());
-			assertEquals(new FieldList(1, 2), joinWithInvariantNode.getKeysForInput2());
-			
-			// verify joinWithSolutionSet
-			assertEquals(ShipStrategyType.PARTITION_HASH, joinWithSolutionSetNode.getInput1().getShipStrategy());
-			assertEquals(ShipStrategyType.FORWARD, joinWithSolutionSetNode.getInput2().getShipStrategy());
-			assertEquals(new FieldList(1, 0), joinWithSolutionSetNode.getKeysForInput1());
-			
-			// verify reducer
-			assertEquals(ShipStrategyType.FORWARD, worksetReducer.getInput().getShipStrategy());
-			assertEquals(new FieldList(1, 2), worksetReducer.getKeys(0));
-			
-			
-			// verify solution delta
-			assertEquals(1, joinWithSolutionSetNode.getOutgoingChannels().size());
-			assertEquals(ShipStrategyType.FORWARD, joinWithSolutionSetNode.getOutgoingChannels().get(0).getShipStrategy());
-			
-			new JobGraphGenerator().compileJobGraph(oPlan);
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			fail("Test errored: " + e.getMessage());
-		}
-	}
-	
-	
-	@Test
-	public void testRejectPlanIfSolutionSetKeysAndJoinKeysDontMatch() {
-		try {
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
-			
-			@SuppressWarnings("unchecked")
-			DataSet<Tuple3<Long, Long, Long>> solutionSetInput = env.fromElements(new Tuple3<Long, Long, Long>(1L, 2L, 3L)).name("Solution Set");
-			@SuppressWarnings("unchecked")
-			DataSet<Tuple3<Long, Long, Long>> worksetInput = env.fromElements(new Tuple3<Long, Long, Long>(1L, 2L, 3L)).name("Workset");
-			@SuppressWarnings("unchecked")
-			DataSet<Tuple3<Long, Long, Long>> invariantInput = env.fromElements(new Tuple3<Long, Long, Long>(1L, 2L, 3L)).name("Invariant Input");
-			
-			DeltaIteration<Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>> iter = solutionSetInput.iterateDelta(worksetInput, 100, 1, 2);
-			
-			
-			DataSet<Tuple3<Long, Long, Long>> result = 
-			
-			iter.getWorkset().join(invariantInput)
-				.where(1, 2)
-				.equalTo(1, 2)
-				.with(new JoinFunction<Tuple3<Long,Long,Long>, Tuple3<Long, Long, Long>, Tuple3<Long,Long,Long>>() {
-					public Tuple3<Long, Long, Long> join(Tuple3<Long, Long, Long> first, Tuple3<Long, Long, Long> second) {
-						return first;
-					}
-				});
-			
-			try {
-			result.join(iter.getSolutionSet())
-				.where(1, 0)
-				.equalTo(0, 2)
-				.with(new JoinFunction<Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>>() {
-					public Tuple3<Long, Long, Long> join(Tuple3<Long, Long, Long> first, Tuple3<Long, Long, Long> second) {
-						return second;
-					}
-				});
-				fail("The join should be rejected with key type mismatches.");
-			}
-			catch (InvalidProgramException e) {
-				// expected!
-			}
-			
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			fail("Test errored: " + e.getMessage());
-		}
-	}
-	
-	private Plan getJavaTestPlan(boolean joinPreservesSolutionSet, boolean mapBeforeSolutionDelta) {
-		
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
-		
-		@SuppressWarnings("unchecked")
-		DataSet<Tuple3<Long, Long, Long>> solutionSetInput = env.fromElements(new Tuple3<Long, Long, Long>(1L, 2L, 3L)).name("Solution Set");
-		@SuppressWarnings("unchecked")
-		DataSet<Tuple3<Long, Long, Long>> worksetInput = env.fromElements(new Tuple3<Long, Long, Long>(1L, 2L, 3L)).name("Workset");
-		@SuppressWarnings("unchecked")
-		DataSet<Tuple3<Long, Long, Long>> invariantInput = env.fromElements(new Tuple3<Long, Long, Long>(1L, 2L, 3L)).name("Invariant Input");
-		
-		DeltaIteration<Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>> iter = solutionSetInput.iterateDelta(worksetInput, 100, 1, 2);
-		
-		
-		DataSet<Tuple3<Long, Long, Long>> joinedWithSolutionSet = 
-		
-		iter.getWorkset().join(invariantInput)
-			.where(1, 2)
-			.equalTo(1, 2)
-			.with(new RichJoinFunction<Tuple3<Long,Long,Long>, Tuple3<Long, Long, Long>, Tuple3<Long,Long,Long>>() {
-				public Tuple3<Long, Long, Long> join(Tuple3<Long, Long, Long> first, Tuple3<Long, Long, Long> second) {
-					return first;
-				}
-			})
-			.name(JOIN_WITH_INVARIANT_NAME)
-		
-		.join(iter.getSolutionSet())
-			.where(1, 0)
-			.equalTo(1, 2)
-			.with(new RichJoinFunction<Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>>() {
-				public Tuple3<Long, Long, Long> join(Tuple3<Long, Long, Long> first, Tuple3<Long, Long, Long> second) {
-					return second;
-				}
-			})
-			.name(JOIN_WITH_SOLUTION_SET)
-			.withForwardedFieldsSecond(joinPreservesSolutionSet ? new String[] {"0->0", "1->1", "2->2" } : null);
-			
-		DataSet<Tuple3<Long, Long, Long>> nextWorkset = joinedWithSolutionSet.groupBy(1, 2)
-			.reduceGroup(new RichGroupReduceFunction<Tuple3<Long,Long,Long>, Tuple3<Long,Long,Long>>() {
-				public void reduce(Iterable<Tuple3<Long, Long, Long>> values, Collector<Tuple3<Long, Long, Long>> out) {}
-			})
-			.name(NEXT_WORKSET_REDUCER_NAME)
-			.withForwardedFields("1->1","2->2","0->0");
-		
-		
-		DataSet<Tuple3<Long, Long, Long>> nextSolutionSet = mapBeforeSolutionDelta ?
-				joinedWithSolutionSet.map(new RichMapFunction<Tuple3<Long, Long, Long>,Tuple3<Long, Long, Long>>() { public Tuple3<Long, Long, Long> map(Tuple3<Long, Long, Long> value) { return value; } })
-					.name(SOLUTION_DELTA_MAPPER_NAME).withForwardedFields("0->0","1->1","2->2") :
-				joinedWithSolutionSet;
-		
-		iter.closeWith(nextSolutionSet, nextWorkset)
-			.print();
-		
-		return env.createProgramPlan();
-	}
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/CoGroupGlobalPropertiesCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/CoGroupGlobalPropertiesCompatibilityTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/CoGroupGlobalPropertiesCompatibilityTest.java
deleted file mode 100644
index 23f8897..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/CoGroupGlobalPropertiesCompatibilityTest.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.operators;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
-import org.junit.Test;
-
-@SuppressWarnings("serial")
-public class CoGroupGlobalPropertiesCompatibilityTest {
-
-	@Test
-	public void checkCompatiblePartitionings() {
-		try {
-			final FieldList keysLeft = new FieldList(1, 4);
-			final FieldList keysRight = new FieldList(3, 1);
-			
-			CoGroupDescriptor descr = new CoGroupDescriptor(keysLeft, keysRight);
-			
-			// test compatible hash partitioning
-			{
-				RequestedGlobalProperties reqLeft = new RequestedGlobalProperties();
-				reqLeft.setHashPartitioned(keysLeft);
-				RequestedGlobalProperties reqRight = new RequestedGlobalProperties();
-				reqRight.setHashPartitioned(keysRight);
-				
-				GlobalProperties propsLeft = new GlobalProperties();
-				propsLeft.setHashPartitioned(keysLeft);
-				GlobalProperties propsRight = new GlobalProperties();
-				propsRight.setHashPartitioned(keysRight);
-				
-				assertTrue(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
-			}
-			
-			// test compatible custom partitioning
-			{
-				Partitioner<Object> part = new Partitioner<Object>() {
-					@Override
-					public int partition(Object key, int numPartitions) {
-						return 0;
-					}
-				};
-				
-				RequestedGlobalProperties reqLeft = new RequestedGlobalProperties();
-				reqLeft.setCustomPartitioned(keysLeft, part);
-				RequestedGlobalProperties reqRight = new RequestedGlobalProperties();
-				reqRight.setCustomPartitioned(keysRight, part);
-				
-				GlobalProperties propsLeft = new GlobalProperties();
-				propsLeft.setCustomPartitioned(keysLeft, part);
-				GlobalProperties propsRight = new GlobalProperties();
-				propsRight.setCustomPartitioned(keysRight, part);
-				
-				assertTrue(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
-			}
-			
-			// test custom partitioning matching any partitioning
-			{
-				Partitioner<Object> part = new Partitioner<Object>() {
-					@Override
-					public int partition(Object key, int numPartitions) {
-						return 0;
-					}
-				};
-				
-				RequestedGlobalProperties reqLeft = new RequestedGlobalProperties();
-				reqLeft.setAnyPartitioning(keysLeft);
-				RequestedGlobalProperties reqRight = new RequestedGlobalProperties();
-				reqRight.setAnyPartitioning(keysRight);
-				
-				GlobalProperties propsLeft = new GlobalProperties();
-				propsLeft.setCustomPartitioned(keysLeft, part);
-				GlobalProperties propsRight = new GlobalProperties();
-				propsRight.setCustomPartitioned(keysRight, part);
-				
-				assertTrue(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void checkInompatiblePartitionings() {
-		try {
-			final FieldList keysLeft = new FieldList(1);
-			final FieldList keysRight = new FieldList(3);
-			
-			final Partitioner<Object> part = new Partitioner<Object>() {
-				@Override
-				public int partition(Object key, int numPartitions) {
-					return 0;
-				}
-			};
-			final Partitioner<Object> part2 = new Partitioner<Object>() {
-				@Override
-				public int partition(Object key, int numPartitions) {
-					return 0;
-				}
-			};
-			
-			CoGroupDescriptor descr = new CoGroupDescriptor(keysLeft, keysRight);
-			
-			// test incompatible hash with custom partitioning
-			{
-				RequestedGlobalProperties reqLeft = new RequestedGlobalProperties();
-				reqLeft.setAnyPartitioning(keysLeft);
-				RequestedGlobalProperties reqRight = new RequestedGlobalProperties();
-				reqRight.setAnyPartitioning(keysRight);
-				
-				GlobalProperties propsLeft = new GlobalProperties();
-				propsLeft.setHashPartitioned(keysLeft);
-				GlobalProperties propsRight = new GlobalProperties();
-				propsRight.setCustomPartitioned(keysRight, part);
-				
-				assertFalse(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
-			}
-			
-			// test incompatible custom partitionings
-			{
-				RequestedGlobalProperties reqLeft = new RequestedGlobalProperties();
-				reqLeft.setAnyPartitioning(keysLeft);
-				RequestedGlobalProperties reqRight = new RequestedGlobalProperties();
-				reqRight.setAnyPartitioning(keysRight);
-				
-				GlobalProperties propsLeft = new GlobalProperties();
-				propsLeft.setCustomPartitioned(keysLeft, part);
-				GlobalProperties propsRight = new GlobalProperties();
-				propsRight.setCustomPartitioned(keysRight, part2);
-				
-				assertFalse(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/CoGroupOnConflictingPartitioningsTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/CoGroupOnConflictingPartitioningsTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/CoGroupOnConflictingPartitioningsTest.java
deleted file mode 100644
index e7807c9..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/CoGroupOnConflictingPartitioningsTest.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.operators;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.CompilerTestBase;
-import org.apache.flink.optimizer.Optimizer;
-import org.apache.flink.optimizer.testfunctions.DummyCoGroupFunction;
-import org.apache.flink.configuration.Configuration;
-import org.junit.Test;
-
-@SuppressWarnings({"serial", "unchecked"})
-public class CoGroupOnConflictingPartitioningsTest extends CompilerTestBase {
-
-	@Test
-	public void testRejectCoGroupOnHashAndRangePartitioning() {
-		try {
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			
-			DataSet<Tuple2<Long, Long>> input = env.fromElements(new Tuple2<Long, Long>(0L, 0L));
-			
-			Configuration cfg = new Configuration();
-			cfg.setString(Optimizer.HINT_SHIP_STRATEGY_FIRST_INPUT, Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH);
-			cfg.setString(Optimizer.HINT_SHIP_STRATEGY_SECOND_INPUT, Optimizer.HINT_SHIP_STRATEGY_REPARTITION_RANGE);
-			
-			input.coGroup(input).where(0).equalTo(0)
-				.with(new DummyCoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>())
-				.withParameters(cfg)
-				.print();
-			
-			Plan p = env.createProgramPlan();
-			try {
-				compileNoStats(p);
-				fail("This should fail with an exception");
-			}
-			catch (CompilerException e) {
-				// expected
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/JoinGlobalPropertiesCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/JoinGlobalPropertiesCompatibilityTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/JoinGlobalPropertiesCompatibilityTest.java
deleted file mode 100644
index 839f0a1..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/JoinGlobalPropertiesCompatibilityTest.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.operators;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
-import org.junit.Test;
-
-@SuppressWarnings("serial")
-public class JoinGlobalPropertiesCompatibilityTest {
-
-	@Test
-	public void checkCompatiblePartitionings() {
-		try {
-			final FieldList keysLeft = new FieldList(1, 4);
-			final FieldList keysRight = new FieldList(3, 1);
-			
-			SortMergeJoinDescriptor descr = new SortMergeJoinDescriptor(keysLeft, keysRight);
-			
-			// test compatible hash partitioning
-			{
-				RequestedGlobalProperties reqLeft = new RequestedGlobalProperties();
-				reqLeft.setHashPartitioned(keysLeft);
-				RequestedGlobalProperties reqRight = new RequestedGlobalProperties();
-				reqRight.setHashPartitioned(keysRight);
-				
-				GlobalProperties propsLeft = new GlobalProperties();
-				propsLeft.setHashPartitioned(keysLeft);
-				GlobalProperties propsRight = new GlobalProperties();
-				propsRight.setHashPartitioned(keysRight);
-				
-				assertTrue(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
-			}
-			
-			// test compatible custom partitioning
-			{
-				Partitioner<Object> part = new Partitioner<Object>() {
-					@Override
-					public int partition(Object key, int numPartitions) {
-						return 0;
-					}
-				};
-				
-				RequestedGlobalProperties reqLeft = new RequestedGlobalProperties();
-				reqLeft.setCustomPartitioned(keysLeft, part);
-				RequestedGlobalProperties reqRight = new RequestedGlobalProperties();
-				reqRight.setCustomPartitioned(keysRight, part);
-				
-				GlobalProperties propsLeft = new GlobalProperties();
-				propsLeft.setCustomPartitioned(keysLeft, part);
-				GlobalProperties propsRight = new GlobalProperties();
-				propsRight.setCustomPartitioned(keysRight, part);
-				
-				assertTrue(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
-			}
-			
-			// test custom partitioning matching any partitioning
-			{
-				Partitioner<Object> part = new Partitioner<Object>() {
-					@Override
-					public int partition(Object key, int numPartitions) {
-						return 0;
-					}
-				};
-				
-				RequestedGlobalProperties reqLeft = new RequestedGlobalProperties();
-				reqLeft.setAnyPartitioning(keysLeft);
-				RequestedGlobalProperties reqRight = new RequestedGlobalProperties();
-				reqRight.setAnyPartitioning(keysRight);
-				
-				GlobalProperties propsLeft = new GlobalProperties();
-				propsLeft.setCustomPartitioned(keysLeft, part);
-				GlobalProperties propsRight = new GlobalProperties();
-				propsRight.setCustomPartitioned(keysRight, part);
-				
-				assertTrue(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void checkInompatiblePartitionings() {
-		try {
-			final FieldList keysLeft = new FieldList(1);
-			final FieldList keysRight = new FieldList(3);
-			
-			final Partitioner<Object> part = new Partitioner<Object>() {
-				@Override
-				public int partition(Object key, int numPartitions) {
-					return 0;
-				}
-			};
-			final Partitioner<Object> part2 = new Partitioner<Object>() {
-				@Override
-				public int partition(Object key, int numPartitions) {
-					return 0;
-				}
-			};
-			
-			SortMergeJoinDescriptor descr = new SortMergeJoinDescriptor(keysLeft, keysRight);
-			
-			// test incompatible hash with custom partitioning
-			{
-				RequestedGlobalProperties reqLeft = new RequestedGlobalProperties();
-				reqLeft.setAnyPartitioning(keysLeft);
-				RequestedGlobalProperties reqRight = new RequestedGlobalProperties();
-				reqRight.setAnyPartitioning(keysRight);
-				
-				GlobalProperties propsLeft = new GlobalProperties();
-				propsLeft.setHashPartitioned(keysLeft);
-				GlobalProperties propsRight = new GlobalProperties();
-				propsRight.setCustomPartitioned(keysRight, part);
-				
-				assertFalse(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
-			}
-			
-			// test incompatible custom partitionings
-			{
-				RequestedGlobalProperties reqLeft = new RequestedGlobalProperties();
-				reqLeft.setAnyPartitioning(keysLeft);
-				RequestedGlobalProperties reqRight = new RequestedGlobalProperties();
-				reqRight.setAnyPartitioning(keysRight);
-				
-				GlobalProperties propsLeft = new GlobalProperties();
-				propsLeft.setCustomPartitioned(keysLeft, part);
-				GlobalProperties propsRight = new GlobalProperties();
-				propsRight.setCustomPartitioned(keysRight, part2);
-				
-				assertFalse(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/JoinOnConflictingPartitioningsTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/JoinOnConflictingPartitioningsTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/JoinOnConflictingPartitioningsTest.java
deleted file mode 100644
index 9171cc7..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/JoinOnConflictingPartitioningsTest.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.operators;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.CompilerTestBase;
-import org.apache.flink.optimizer.Optimizer;
-import org.apache.flink.configuration.Configuration;
-import org.junit.Test;
-
-@SuppressWarnings({"serial", "unchecked"})
-public class JoinOnConflictingPartitioningsTest extends CompilerTestBase {
-
-	@Test
-	public void testRejectJoinOnHashAndRangePartitioning() {
-		try {
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			
-			DataSet<Tuple2<Long, Long>> input = env.fromElements(new Tuple2<Long, Long>(0L, 0L));
-			
-			Configuration cfg = new Configuration();
-			cfg.setString(Optimizer.HINT_SHIP_STRATEGY_FIRST_INPUT, Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH);
-			cfg.setString(Optimizer.HINT_SHIP_STRATEGY_SECOND_INPUT, Optimizer.HINT_SHIP_STRATEGY_REPARTITION_RANGE);
-			
-			input.join(input).where(0).equalTo(0)
-				.withParameters(cfg)
-				.print();
-			
-			Plan p = env.createProgramPlan();
-			try {
-				compileNoStats(p);
-				fail("This should fail with an exception");
-			}
-			catch (CompilerException e) {
-				// expected
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/plan/ChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/plan/ChannelTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/plan/ChannelTest.java
deleted file mode 100644
index 2c1574b..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/plan/ChannelTest.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.plan;
-
-import org.apache.flink.api.common.operators.GenericDataSourceBase;
-import org.apache.flink.api.common.operators.OperatorInformation;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.optimizer.dag.DataSourceNode;
-import org.apache.flink.core.fs.Path;
-import org.junit.Assert;
-import org.junit.Test;
-import org.apache.flink.api.java.io.TextInputFormat;
-
-public class ChannelTest {
-	
-	@Test
-	public void testGetEstimatesNoReplicationFactor() {
-		final long NUM_RECORD = 1001;
-		final long SIZE = 467131;
-		
-		DataSourceNode source = getSourceNode();
-		SourcePlanNode planNode = new SourcePlanNode(source, "test node");
-		Channel channel = new Channel(planNode);
-
-		// no estimates here
-		Assert.assertEquals(-1, channel.getEstimatedOutputSize());
-		Assert.assertEquals(-1, channel.getEstimatedNumRecords());
-		
-		// set estimates
-		source.setEstimatedNumRecords(NUM_RECORD);
-		source.setEstimatedOutputSize(SIZE);
-		Assert.assertEquals(SIZE, channel.getEstimatedOutputSize());
-		Assert.assertEquals(NUM_RECORD, channel.getEstimatedNumRecords());
-	}
-	
-	@Test
-	public void testGetEstimatesWithReplicationFactor() {
-		final long NUM_RECORD = 1001;
-		final long SIZE = 467131;
-		
-		final int REPLICATION = 23;
-		
-		DataSourceNode source = getSourceNode();
-		SourcePlanNode planNode = new SourcePlanNode(source, "test node");
-		Channel channel = new Channel(planNode);
-		channel.setReplicationFactor(REPLICATION);
-
-		// no estimates here
-		Assert.assertEquals(-1, channel.getEstimatedOutputSize());
-		Assert.assertEquals(-1, channel.getEstimatedNumRecords());
-		
-		// set estimates
-		source.setEstimatedNumRecords(NUM_RECORD);
-		source.setEstimatedOutputSize(SIZE);
-		Assert.assertEquals(SIZE * REPLICATION, channel.getEstimatedOutputSize());
-		Assert.assertEquals(NUM_RECORD * REPLICATION, channel.getEstimatedNumRecords());
-	}
-	
-	
-//	private static final OptimizerNode getSingleInputNode() {
-//		return new MapNode(new MapOperatorBase<String, String, GenericMap<String,String>>(
-//				new IdentityMapper<String>(),
-//				new UnaryOperatorInformation<String, String>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO),
-//				"map"));
-//	}
-	
-	private static final DataSourceNode getSourceNode() {
-		return new DataSourceNode(new GenericDataSourceBase<String, TextInputFormat>(
-				new TextInputFormat(new Path("/ignored")), 
-				new OperatorInformation<String>(BasicTypeInfo.STRING_TYPE_INFO),
-				"source"));
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/plandump/NumberFormattingTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/plandump/NumberFormattingTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/plandump/NumberFormattingTest.java
deleted file mode 100644
index 366d10d..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/plandump/NumberFormattingTest.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.plandump;
-
-import static org.junit.Assert.assertEquals;
-
-import org.junit.Test;
-
-public class NumberFormattingTest {
-
-	@Test
-	public void testFormatNumberNoDigit() {
-		assertEquals("0.0", PlanJSONDumpGenerator.formatNumber(0));
-		assertEquals("0.00", PlanJSONDumpGenerator.formatNumber(0.0000000001));
-		assertEquals("-1.0", PlanJSONDumpGenerator.formatNumber(-1.0));
-		assertEquals("1.00", PlanJSONDumpGenerator.formatNumber(1));
-		assertEquals("17.00", PlanJSONDumpGenerator.formatNumber(17));
-		assertEquals("17.44", PlanJSONDumpGenerator.formatNumber(17.44));
-		assertEquals("143.00", PlanJSONDumpGenerator.formatNumber(143));
-		assertEquals("143.40", PlanJSONDumpGenerator.formatNumber(143.4));
-		assertEquals("143.50", PlanJSONDumpGenerator.formatNumber(143.5));
-		assertEquals("143.60", PlanJSONDumpGenerator.formatNumber(143.6));
-		assertEquals("143.45", PlanJSONDumpGenerator.formatNumber(143.45));
-		assertEquals("143.55", PlanJSONDumpGenerator.formatNumber(143.55));
-		assertEquals("143.65", PlanJSONDumpGenerator.formatNumber(143.65));
-		assertEquals("143.66", PlanJSONDumpGenerator.formatNumber(143.655));
-		
-		assertEquals("1.13 K", PlanJSONDumpGenerator.formatNumber(1126.0));
-		assertEquals("11.13 K", PlanJSONDumpGenerator.formatNumber(11126.0));
-		assertEquals("118.13 K", PlanJSONDumpGenerator.formatNumber(118126.0));
-
-		assertEquals("1.44 M", PlanJSONDumpGenerator.formatNumber(1435126.0));
-	}
-	
-	
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/DummyCoGroupFunction.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/DummyCoGroupFunction.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/DummyCoGroupFunction.java
deleted file mode 100644
index 7fea8a6..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/DummyCoGroupFunction.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.testfunctions;
-
-import org.apache.flink.api.common.functions.RichCoGroupFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.util.Collector;
-
-public class DummyCoGroupFunction<L, R> extends RichCoGroupFunction<L, R, Tuple2<L, R>> {
-	
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public void coGroup(Iterable<L> first, Iterable<R> second, Collector<Tuple2<L, R>> out) {}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/DummyFlatJoinFunction.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/DummyFlatJoinFunction.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/DummyFlatJoinFunction.java
deleted file mode 100644
index 6be8a24..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/DummyFlatJoinFunction.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.testfunctions;
-
-import org.apache.flink.api.common.functions.RichFlatJoinFunction;
-import org.apache.flink.util.Collector;
-
-public class DummyFlatJoinFunction<T> extends RichFlatJoinFunction<T, T, T> {
-
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public void join(T first, T second, Collector<T> out) {
-		out.collect(null);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/DummyReducer.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/DummyReducer.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/DummyReducer.java
deleted file mode 100644
index 44d3695..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/DummyReducer.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.testfunctions;
-
-import org.apache.flink.api.common.functions.RichReduceFunction;
-
-public class DummyReducer<T> extends RichReduceFunction<T> {
-
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public T reduce(T a, T b) {
-		return a;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityFlatMapper.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityFlatMapper.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityFlatMapper.java
deleted file mode 100644
index 0316463..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityFlatMapper.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.testfunctions;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.util.Collector;
-
-public class IdentityFlatMapper<T> implements FlatMapFunction<T, T> {
-
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public void flatMap(T value, Collector<T> out) {}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducer.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducer.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducer.java
deleted file mode 100644
index 11fd044..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducer.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.testfunctions;
-
-
-import org.apache.flink.api.common.functions.RichGroupReduceFunction;
-import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable;
-import org.apache.flink.util.Collector;
-
-
-@Combinable
-public class IdentityGroupReducer<T> extends RichGroupReduceFunction<T, T> {
-
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public void reduce(Iterable<T> values, Collector<T> out) {
-		for (T next : values) {
-			out.collect(next);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityKeyExtractor.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityKeyExtractor.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityKeyExtractor.java
deleted file mode 100644
index f335846..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityKeyExtractor.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.testfunctions;
-
-import org.apache.flink.api.java.functions.KeySelector;
-
-public class IdentityKeyExtractor<T> implements KeySelector<T, T> {
-
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public T getKey(T value) {
-		return value;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityMapper.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityMapper.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityMapper.java
deleted file mode 100644
index 025b4d8..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityMapper.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.testfunctions;
-
-import org.apache.flink.api.common.functions.RichMapFunction;
-
-public class IdentityMapper<T> extends RichMapFunction<T, T> {
-
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public T map(T value) {
-		return value;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityPartitionerMapper.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityPartitionerMapper.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityPartitionerMapper.java
deleted file mode 100644
index 6efbef1..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityPartitionerMapper.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.testfunctions;
-
-import org.apache.flink.api.common.functions.RichMapPartitionFunction;
-import org.apache.flink.util.Collector;
-
-public class IdentityPartitionerMapper<T> extends RichMapPartitionFunction<T, T> {
-
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public void mapPartition(Iterable<T> values, Collector<T> out) {
-		for (T in : values) {
-			out.collect(in);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/SelectOneReducer.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/SelectOneReducer.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/SelectOneReducer.java
deleted file mode 100644
index 39c0e1b..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/SelectOneReducer.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.testfunctions;
-
-import org.apache.flink.api.common.functions.RichReduceFunction;
-
-public class SelectOneReducer<T> extends RichReduceFunction<T> {
-
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public T reduce(T value1, T value2) throws Exception {
-		return value1;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/Top1GroupReducer.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/Top1GroupReducer.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/Top1GroupReducer.java
deleted file mode 100644
index 48d13ca..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/testfunctions/Top1GroupReducer.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.testfunctions;
-
-import org.apache.flink.api.common.functions.RichGroupReduceFunction;
-import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable;
-import org.apache.flink.util.Collector;
-
-
-@Combinable
-public class Top1GroupReducer<T> extends RichGroupReduceFunction<T, T> {
-
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public void reduce(Iterable<T> values, Collector<T> out) {
-		out.collect(values.iterator().next());
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/util/DummyCoGroupStub.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/util/DummyCoGroupStub.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/util/DummyCoGroupStub.java
deleted file mode 100644
index 6a84c44..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/util/DummyCoGroupStub.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.util;
-
-import java.io.Serializable;
-import java.util.Iterator;
-
-import org.apache.flink.api.java.record.functions.CoGroupFunction;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-
-@SuppressWarnings("deprecation")
-public class DummyCoGroupStub extends CoGroupFunction implements Serializable {
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public void coGroup(Iterator<Record> records1, Iterator<Record> records2, Collector<Record> out) {
-		while (records1.hasNext()) {
-			out.collect(records1.next());
-		}
-
-		while (records2.hasNext()) {
-			out.collect(records2.next());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/util/DummyCrossStub.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/util/DummyCrossStub.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/util/DummyCrossStub.java
deleted file mode 100644
index 8ee2285..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/util/DummyCrossStub.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.util;
-
-import org.apache.flink.api.java.record.functions.CrossFunction;
-import org.apache.flink.types.Record;
-
-@SuppressWarnings("deprecation")
-public class DummyCrossStub extends CrossFunction {
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public Record cross(Record first, Record second) throws Exception {
-		return first;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/util/DummyInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/util/DummyInputFormat.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/util/DummyInputFormat.java
deleted file mode 100644
index 0c816e7..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/util/DummyInputFormat.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.util;
-
-import org.apache.flink.api.common.io.statistics.BaseStatistics;
-import org.apache.flink.api.java.record.io.DelimitedInputFormat;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-
-public final class DummyInputFormat extends DelimitedInputFormat {
-	private static final long serialVersionUID = 1L;
-	
-	private final IntValue integer = new IntValue(1);
-
-	@Override
-	public Record readRecord(Record target, byte[] bytes, int offset, int numBytes) {
-		target.setField(0, this.integer);
-		target.setField(1, this.integer);
-		return target;
-	}
-
-	@Override
-	public FileBaseStatistics getStatistics(BaseStatistics cachedStatistics) {
-		return (cachedStatistics instanceof FileBaseStatistics) ? (FileBaseStatistics) cachedStatistics : null;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/util/DummyMatchStub.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/util/DummyMatchStub.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/util/DummyMatchStub.java
deleted file mode 100644
index d00be6e..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/util/DummyMatchStub.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.util;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.java.record.functions.JoinFunction;
-import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsFirstExcept;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-
-@SuppressWarnings("deprecation")
-@ConstantFieldsFirstExcept({})
-public class DummyMatchStub extends JoinFunction implements Serializable {
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public void join(Record value1, Record value2, Collector<Record> out) throws Exception {
-		out.collect(value1);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/util/DummyNonPreservingMatchStub.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/util/DummyNonPreservingMatchStub.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/util/DummyNonPreservingMatchStub.java
deleted file mode 100644
index 444b48e..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/util/DummyNonPreservingMatchStub.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.util;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.java.record.functions.JoinFunction;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-
-@SuppressWarnings("deprecation")
-public class DummyNonPreservingMatchStub extends JoinFunction implements Serializable {
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public void join(Record value1, Record value2, Collector<Record> out) throws Exception {
-		out.collect(value1);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/util/DummyOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/util/DummyOutputFormat.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/util/DummyOutputFormat.java
deleted file mode 100644
index 1bbe24c..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/util/DummyOutputFormat.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.util;
-
-
-import org.apache.flink.api.java.record.io.DelimitedOutputFormat;
-import org.apache.flink.types.Record;
-
-
-public final class DummyOutputFormat extends DelimitedOutputFormat {
-	private static final long serialVersionUID = 1L;
-	
-	@Override
-	public int serializeRecord(Record rec, byte[] target) throws Exception {
-		return 0;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/util/IdentityMap.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/util/IdentityMap.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/util/IdentityMap.java
deleted file mode 100644
index cccc6cb..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/util/IdentityMap.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.util;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsExcept;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-
-@SuppressWarnings("deprecation")
-@ConstantFieldsExcept({})
-public final class IdentityMap extends MapFunction implements Serializable {
-	private static final long serialVersionUID = 1L;
-	
-	@Override
-	public void map(Record record, Collector<Record> out) throws Exception {
-		out.collect(record);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/util/IdentityReduce.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/util/IdentityReduce.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/util/IdentityReduce.java
deleted file mode 100644
index f45745d..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/util/IdentityReduce.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.util;
-
-import java.io.Serializable;
-import java.util.Iterator;
-
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsExcept;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-
-@SuppressWarnings("deprecation")
-@ConstantFieldsExcept({})
-public final class IdentityReduce extends ReduceFunction implements Serializable {
-	private static final long serialVersionUID = 1L;
-	
-	@Override
-	public void reduce(Iterator<Record> records, Collector<Record> out) throws Exception {
-		while (records.hasNext()) {
-			out.collect(records.next());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/resources/log4j-test.properties b/flink-compiler/src/test/resources/log4j-test.properties
deleted file mode 100644
index 2fb9345..0000000
--- a/flink-compiler/src/test/resources/log4j-test.properties
+++ /dev/null
@@ -1,19 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-log4j.rootLogger=OFF
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/resources/log4j.properties b/flink-compiler/src/test/resources/log4j.properties
deleted file mode 100644
index fa3f937..0000000
--- a/flink-compiler/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,27 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-# This file ensures that tests executed from the IDE show log output
-
-log4j.rootLogger=INFO, console
-
-# Log all infos in the given file
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.target  = System.err
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/resources/logback-test.xml b/flink-compiler/src/test/resources/logback-test.xml
deleted file mode 100644
index 8b3bb27..0000000
--- a/flink-compiler/src/test/resources/logback-test.xml
+++ /dev/null
@@ -1,29 +0,0 @@
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one
-  ~ or more contributor license agreements.  See the NOTICE file
-  ~ distributed with this work for additional information
-  ~ regarding copyright ownership.  The ASF licenses this file
-  ~ to you under the Apache License, Version 2.0 (the
-  ~ "License"); you may not use this file except in compliance
-  ~ with the License.  You may obtain a copy of the License at
-  ~
-  ~     http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<configuration>
-    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
-        <encoder>
-            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
-        </encoder>
-    </appender>
-
-    <root level="WARN">
-        <appender-ref ref="STDOUT"/>
-    </root>
-</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-dist/pom.xml
----------------------------------------------------------------------
diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml
index 7ee7f25..b826c45 100644
--- a/flink-dist/pom.xml
+++ b/flink-dist/pom.xml
@@ -61,7 +61,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-compiler</artifactId>
+			<artifactId>flink-optimizer</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/pom.xml
----------------------------------------------------------------------
diff --git a/flink-optimizer/pom.xml b/flink-optimizer/pom.xml
new file mode 100644
index 0000000..55764e9
--- /dev/null
+++ b/flink-optimizer/pom.xml
@@ -0,0 +1,63 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-parent</artifactId>
+		<version>0.9-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-optimizer</artifactId>
+	<name>flink-optimizer</name>
+
+	<packaging>jar</packaging>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-runtime</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-java</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		
+		<dependency>
+			<groupId>com.google.guava</groupId>
+			<artifactId>guava</artifactId>
+			<version>${guava.version}</version>
+		</dependency>
+	</dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/CompilerException.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/CompilerException.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/CompilerException.java
new file mode 100644
index 0000000..2f99ddb
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/CompilerException.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer;
+
+/**
+ * An exception that is thrown by the Optimizer when encountering an illegal condition.
+ */
+public class CompilerException extends RuntimeException {
+
+	private static final long serialVersionUID = 3810067304570563755L;
+
+	/**
+	 * Creates a compiler exception with no message and no cause.
+	 */
+	public CompilerException() {}
+
+	/**
+	 * Creates a compiler exception with the given message and no cause.
+	 * 
+	 * @param message
+	 *        The message for the exception.
+	 */
+	public CompilerException(String message) {
+		super(message);
+	}
+
+	/**
+	 * Creates a compiler exception with the given cause and no message.
+	 * 
+	 * @param cause
+	 *        The <tt>Throwable</tt> that caused this exception.
+	 */
+	public CompilerException(Throwable cause) {
+		super(cause);
+	}
+
+	/**
+	 * Creates a compiler exception with the given message and cause.
+	 * 
+	 * @param message
+	 *        The message for the exception.
+	 * @param cause
+	 *        The <tt>Throwable</tt> that caused this exception.
+	 */
+	public CompilerException(String message, Throwable cause) {
+		super(message, cause);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/CompilerPostPassException.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/CompilerPostPassException.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/CompilerPostPassException.java
new file mode 100644
index 0000000..78e47a0
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/CompilerPostPassException.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer;
+
+/**
+ * An exception that is thrown by the Optimizer when encountering
+ * a problem during the optimizer post pass. This is a dedicated exception
+ * because it is thrown by user-specified optimizer extensions.
+ */
+public class CompilerPostPassException extends CompilerException {
+
+	private static final long serialVersionUID = -322650826288034623L;
+
+	/**
+	 * Creates a post pass exception with no message and no cause.
+	 */
+	public CompilerPostPassException() {}
+
+	/**
+	 * Creates a post pass exception with the given message and no cause.
+	 * 
+	 * @param message The message for the exception.
+	 */
+	public CompilerPostPassException(String message) {
+		super(message);
+	}
+
+	/**
+	 * Creates a post pass exception with the given cause and no message.
+	 * 
+	 * @param cause The <tt>Throwable</tt> that caused this exception.
+	 */
+	public CompilerPostPassException(Throwable cause) {
+		super(cause);
+	}
+
+	/**
+	 * Creates a post pass exception with the given message and cause.
+	 * 
+	 * @param message The message for the exception.
+	 * @param cause The <tt>Throwable</tt> that caused this exception.
+	 */
+	public CompilerPostPassException(String message, Throwable cause) {
+		super(message, cause);
+	}
+}