You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/18 16:38:19 UTC

[5/7] flink git commit: [tests] Rename 'compiler' tests to 'optimizer' tests for consistent naming

[tests] Rename 'compiler' tests to 'optimizer' tests for consistent naming


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/09fdfda7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/09fdfda7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/09fdfda7

Branch: refs/heads/master
Commit: 09fdfda7f25cf95426bc43ca33ed7bb39c7d353a
Parents: 6eae11f
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Aug 18 11:11:13 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Aug 18 16:37:41 2015 +0200

----------------------------------------------------------------------
 .../compiler/examples/KMeansSingleStepTest.java | 130 -------
 .../examples/RelationalQueryCompilerTest.java   | 351 -------------------
 .../examples/WordCountCompilerTest.java         | 184 ----------
 .../ConnectedComponentsCoGroupTest.java         | 137 --------
 .../iterations/IterativeKMeansTest.java         | 159 ---------
 ...ultipleJoinsWithSolutionSetCompilerTest.java | 142 --------
 .../iterations/PageRankCompilerTest.java        | 110 ------
 .../compiler/plandump/DumpCompiledPlanTest.java | 109 ------
 .../compiler/plandump/PreviewPlanDumpTest.java  | 105 ------
 .../MultipleSolutionSetJoinsITCase.java         |   2 +-
 .../examples/KMeansSingleStepTest.java          | 130 +++++++
 .../examples/RelationalQueryCompilerTest.java   | 351 +++++++++++++++++++
 .../examples/WordCountCompilerTest.java         | 184 ++++++++++
 .../ConnectedComponentsCoGroupTest.java         | 137 ++++++++
 .../iterations/IterativeKMeansTest.java         | 159 +++++++++
 ...ultipleJoinsWithSolutionSetCompilerTest.java | 142 ++++++++
 .../iterations/PageRankCompilerTest.java        | 110 ++++++
 .../jsonplan/DumpCompiledPlanTest.java          | 109 ++++++
 .../optimizer/jsonplan/PreviewPlanDumpTest.java | 105 ++++++
 19 files changed, 1428 insertions(+), 1428 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/09fdfda7/flink-tests/src/test/java/org/apache/flink/test/compiler/examples/KMeansSingleStepTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/compiler/examples/KMeansSingleStepTest.java b/flink-tests/src/test/java/org/apache/flink/test/compiler/examples/KMeansSingleStepTest.java
deleted file mode 100644
index ec532be..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/compiler/examples/KMeansSingleStepTest.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.compiler.examples;
-
-import static org.junit.Assert.*;
-
-import java.util.Arrays;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.apache.flink.optimizer.util.CompilerTestBase;
-import org.apache.flink.optimizer.util.OperatorResolver;
-import org.apache.flink.test.recordJobs.kmeans.KMeansSingleStep;
-import org.junit.Assert;
-import org.junit.Test;
-
-@SuppressWarnings("deprecation")
-public class KMeansSingleStepTest extends CompilerTestBase {
-	
-	private static final String DATAPOINTS = "Data Points";
-	private static final String CENTERS = "Centers";
-	
-	private static final String MAPPER_NAME = "Find Nearest Centers";
-	private static final String REDUCER_NAME = "Recompute Center Positions";
-	
-	private static final String SINK = "New Center Positions";
-	
-	private final FieldList set0 = new FieldList(0);
-	
-	
-	@Test
-	public void testCompileKMeansSingleStepWithStats() {
-		
-		KMeansSingleStep kmi = new KMeansSingleStep();
-		Plan p = kmi.getPlan(String.valueOf(DEFAULT_PARALLELISM), IN_FILE, IN_FILE, OUT_FILE, String.valueOf(20));
-		p.setExecutionConfig(new ExecutionConfig());
-		// set the statistics
-		OperatorResolver cr = getContractResolver(p);
-		FileDataSource pointsSource = cr.getNode(DATAPOINTS);
-		FileDataSource centersSource = cr.getNode(CENTERS);
-		setSourceStatistics(pointsSource, 100l*1024*1024*1024, 32f);
-		setSourceStatistics(centersSource, 1024*1024, 32f);
-		
-		OptimizedPlan plan = compileWithStats(p);
-		checkPlan(plan);
-	}
-
-	@Test
-	public void testCompileKMeansSingleStepWithOutStats() {
-		
-		KMeansSingleStep kmi = new KMeansSingleStep();
-		Plan p = kmi.getPlan(String.valueOf(DEFAULT_PARALLELISM), IN_FILE, IN_FILE, OUT_FILE, String.valueOf(20));
-		p.setExecutionConfig(new ExecutionConfig());
-		OptimizedPlan plan = compileNoStats(p);
-		checkPlan(plan);
-	}
-	
-	
-	private void checkPlan(OptimizedPlan plan) {
-		
-		OptimizerPlanNodeResolver or = getOptimizerPlanNodeResolver(plan);
-		
-		final SinkPlanNode sink = or.getNode(SINK);
-		final SingleInputPlanNode reducer = or.getNode(REDUCER_NAME);
-		final SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getPredecessor();
-		final SingleInputPlanNode mapper = or.getNode(MAPPER_NAME);
-		
-		// check the mapper
-		assertEquals(1, mapper.getBroadcastInputs().size());
-		assertEquals(ShipStrategyType.FORWARD, mapper.getInput().getShipStrategy());
-		assertEquals(ShipStrategyType.BROADCAST, mapper.getBroadcastInputs().get(0).getShipStrategy());
-		
-		assertEquals(LocalStrategy.NONE, mapper.getInput().getLocalStrategy());
-		assertEquals(LocalStrategy.NONE, mapper.getBroadcastInputs().get(0).getLocalStrategy());
-		
-		assertEquals(DriverStrategy.COLLECTOR_MAP, mapper.getDriverStrategy());
-		
-		assertNull(mapper.getInput().getLocalStrategyKeys());
-		assertNull(mapper.getInput().getLocalStrategySortOrder());
-		assertNull(mapper.getBroadcastInputs().get(0).getLocalStrategyKeys());
-		assertNull(mapper.getBroadcastInputs().get(0).getLocalStrategySortOrder());
-		
-		
-		// check the combiner
-		Assert.assertNotNull(combiner);
-		assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
-		assertEquals(LocalStrategy.NONE, combiner.getInput().getLocalStrategy());
-		assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combiner.getDriverStrategy());
-		assertNull(combiner.getInput().getLocalStrategyKeys());
-		assertNull(combiner.getInput().getLocalStrategySortOrder());
-		assertEquals(set0, combiner.getKeys(0));
-		assertEquals(set0, combiner.getKeys(1));
-		
-		// check the reducer
-		assertEquals(ShipStrategyType.PARTITION_HASH, reducer.getInput().getShipStrategy());
-		assertEquals(LocalStrategy.COMBININGSORT, reducer.getInput().getLocalStrategy());
-		assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reducer.getDriverStrategy());
-		assertEquals(set0, reducer.getKeys(0));
-		assertEquals(set0, reducer.getInput().getLocalStrategyKeys());
-		assertTrue(Arrays.equals(reducer.getInput().getLocalStrategySortOrder(), reducer.getSortOrders(0)));
-		
-		// check the sink
-		assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
-		assertEquals(LocalStrategy.NONE, sink.getInput().getLocalStrategy());
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/09fdfda7/flink-tests/src/test/java/org/apache/flink/test/compiler/examples/RelationalQueryCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/compiler/examples/RelationalQueryCompilerTest.java b/flink-tests/src/test/java/org/apache/flink/test/compiler/examples/RelationalQueryCompilerTest.java
deleted file mode 100644
index bc53810..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/compiler/examples/RelationalQueryCompilerTest.java
+++ /dev/null
@@ -1,351 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.compiler.examples;
-
-import java.util.Arrays;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.JoinOperator;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.optimizer.plan.DualInputPlanNode;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.apache.flink.optimizer.util.CompilerTestBase;
-import org.apache.flink.optimizer.util.OperatorResolver;
-import org.apache.flink.test.recordJobs.relational.TPCHQuery3;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Tests TPCH Q3 (simplified) under various input conditions.
- */
-@SuppressWarnings("deprecation")
-public class RelationalQueryCompilerTest extends CompilerTestBase {
-	
-	private static final String ORDERS = "Orders";
-	private static final String LINEITEM = "LineItems";
-	private static final String MAPPER_NAME = "FilterO";
-	private static final String JOIN_NAME = "JoinLiO";
-	
-	private final FieldList set0 = new FieldList(0);
-	private final FieldList set01 = new FieldList(new int[] {0,1});
-	private final ExecutionConfig defaultExecutionConfig = new ExecutionConfig();
-	
-	// ------------------------------------------------------------------------
-	
-	
-	/**
-	 * Verifies that a robust repartitioning plan with a hash join is created in the absence of statistics.
-	 */
-	@Test
-	public void testQueryNoStatistics() {
-		try {
-			TPCHQuery3 query = new TPCHQuery3();
-			Plan p = query.getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, IN_FILE, OUT_FILE);
-			p.setExecutionConfig(defaultExecutionConfig);
-			// compile
-			final OptimizedPlan plan = compileNoStats(p);
-			
-			final OptimizerPlanNodeResolver or = getOptimizerPlanNodeResolver(plan);
-			
-			// get the nodes from the final plan
-			final SinkPlanNode sink = or.getNode("Output");
-			final SingleInputPlanNode reducer = or.getNode("AggLio");
-			final SingleInputPlanNode combiner = reducer.getPredecessor() instanceof SingleInputPlanNode ?
-					(SingleInputPlanNode) reducer.getPredecessor() : null;
-			final DualInputPlanNode join = or.getNode("JoinLiO");
-			final SingleInputPlanNode filteringMapper = or.getNode("FilterO");
-			
-			// verify the optimizer choices
-			checkStandardStrategies(filteringMapper, join, combiner, reducer, sink);
-			Assert.assertTrue(checkRepartitionShipStrategies(join, reducer, combiner));
-			Assert.assertTrue(checkHashJoinStrategies(join, reducer, true) || checkHashJoinStrategies(join, reducer, false));
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail(e.getMessage());
-		}
-	}
-	
-	/**
-	 * Checks if any valid plan is produced. Hash joins are expected to build the orders side, as the statistics
-	 * indicate this to be the smaller one.
-	 */
-	@Test
-	public void testQueryAnyValidPlan() {
-		testQueryGeneric(1024*1024*1024L, 8*1024*1024*1024L, 0.05f, true, true, true, false, true);
-	}
-	
-	/**
-	 * Verifies that the plan compiles in the presence of empty size=0 estimates.
-	 */
-	@Test
-	public void testQueryWithSizeZeroInputs() {
-		testQueryGeneric(0, 0, 0.5f, true, true, true, false, true);
-	}
-	
-	/**
-	 * Statistics that push towards a broadcast join.
-	 */
-	@Test
-	public void testQueryWithStatsForBroadcastHash() {
-		testQueryGeneric(1024l*1024*1024*1024, 1024l*1024*1024*1024, 0.05f, true, false, true, false, false);
-	}
-	
-	/**
-	 * Statistics that push towards a broadcast join.
-	 */
-	@Test
-	public void testQueryWithStatsForRepartitionAny() {
-		testQueryGeneric(100l*1024*1024*1024*1024, 100l*1024*1024*1024*1024, 0.5f, false, true, true, true, true);
-	}
-	
-	/**
-	 * Statistics that push towards a repartition merge join. If the join blows the data volume up significantly,
-	 * re-exploiting the sorted order is cheaper.
-	 */
-	@Test
-	public void testQueryWithStatsForRepartitionMerge() {
-		TPCHQuery3 query = new TPCHQuery3();
-		Plan p = query.getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, IN_FILE, OUT_FILE);
-		p.setExecutionConfig(defaultExecutionConfig);
-		// set compiler hints
-		OperatorResolver cr = getContractResolver(p);
-		JoinOperator match = cr.getNode("JoinLiO");
-		match.getCompilerHints().setFilterFactor(100f);
-		
-		testQueryGeneric(100l*1024*1024*1024*1024, 100l*1024*1024*1024*1024, 0.05f, 100f, false, true, false, false, true);
-	}
-	
-	// ------------------------------------------------------------------------
-	
-	private void testQueryGeneric(long orderSize, long lineItemSize, 
-			float ordersFilterFactor, 
-			boolean broadcastOkay, boolean partitionedOkay,
-			boolean hashJoinFirstOkay, boolean hashJoinSecondOkay, boolean mergeJoinOkay)
-	{
-		testQueryGeneric(orderSize, lineItemSize, ordersFilterFactor, ordersFilterFactor, broadcastOkay, partitionedOkay, hashJoinFirstOkay, hashJoinSecondOkay, mergeJoinOkay);
-	}
-	
-	private void testQueryGeneric(long orderSize, long lineItemSize, 
-			float ordersFilterFactor, float joinFilterFactor,
-			boolean broadcastOkay, boolean partitionedOkay,
-			boolean hashJoinFirstOkay, boolean hashJoinSecondOkay, boolean mergeJoinOkay)
-	{
-		TPCHQuery3 query = new TPCHQuery3();
-		Plan p = query.getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, IN_FILE, OUT_FILE);
-		p.setExecutionConfig(defaultExecutionConfig);
-		testQueryGeneric(p, orderSize, lineItemSize, ordersFilterFactor, joinFilterFactor, broadcastOkay, partitionedOkay, hashJoinFirstOkay, hashJoinSecondOkay, mergeJoinOkay);
-	}
-		
-	private void testQueryGeneric(Plan p, long orderSize, long lineitemSize, 
-			float orderSelectivity, float joinSelectivity, 
-			boolean broadcastOkay, boolean partitionedOkay,
-			boolean hashJoinFirstOkay, boolean hashJoinSecondOkay, boolean mergeJoinOkay)
-	{
-		try {
-			// set statistics
-			OperatorResolver cr = getContractResolver(p);
-			FileDataSource ordersSource = cr.getNode(ORDERS);
-			FileDataSource lineItemSource = cr.getNode(LINEITEM);
-			MapOperator mapper = cr.getNode(MAPPER_NAME);
-			JoinOperator joiner = cr.getNode(JOIN_NAME);
-			setSourceStatistics(ordersSource, orderSize, 100f);
-			setSourceStatistics(lineItemSource, lineitemSize, 140f);
-			mapper.getCompilerHints().setAvgOutputRecordSize(16f);
-			mapper.getCompilerHints().setFilterFactor(orderSelectivity);
-			joiner.getCompilerHints().setFilterFactor(joinSelectivity);
-			
-			// compile
-			final OptimizedPlan plan = compileWithStats(p);
-			final OptimizerPlanNodeResolver or = getOptimizerPlanNodeResolver(plan);
-			
-			// get the nodes from the final plan
-			final SinkPlanNode sink = or.getNode("Output");
-			final SingleInputPlanNode reducer = or.getNode("AggLio");
-			final SingleInputPlanNode combiner = reducer.getPredecessor() instanceof SingleInputPlanNode ?
-					(SingleInputPlanNode) reducer.getPredecessor() : null;
-			final DualInputPlanNode join = or.getNode("JoinLiO");
-			final SingleInputPlanNode filteringMapper = or.getNode("FilterO");
-			
-			checkStandardStrategies(filteringMapper, join, combiner, reducer, sink);
-			
-			// check the possible variants and that the variant ia allowed in this specific setting
-			if (checkBroadcastShipStrategies(join, reducer, combiner)) {
-				Assert.assertTrue("Broadcast join incorrectly chosen.", broadcastOkay);
-				
-				if (checkHashJoinStrategies(join, reducer, true)) {
-					Assert.assertTrue("Hash join (build orders) incorrectly chosen", hashJoinFirstOkay);
-				} else if (checkHashJoinStrategies(join, reducer, false)) {
-					Assert.assertTrue("Hash join (build lineitem) incorrectly chosen", hashJoinSecondOkay);
-				} else if (checkBroadcastMergeJoin(join, reducer)) {
-					Assert.assertTrue("Merge join incorrectly chosen", mergeJoinOkay);
-				} else {
-					Assert.fail("Plan has no correct hash join or merge join strategies.");
-				}
-			}
-			else if (checkRepartitionShipStrategies(join, reducer, combiner)) {
-				Assert.assertTrue("Partitioned join incorrectly chosen.", partitionedOkay);
-				
-				if (checkHashJoinStrategies(join, reducer, true)) {
-					Assert.assertTrue("Hash join (build orders) incorrectly chosen", hashJoinFirstOkay);
-				} else if (checkHashJoinStrategies(join, reducer, false)) {
-					Assert.assertTrue("Hash join (build lineitem) incorrectly chosen", hashJoinSecondOkay);
-				} else if (checkRepartitionMergeJoin(join, reducer)) {
-					Assert.assertTrue("Merge join incorrectly chosen", mergeJoinOkay);
-				} else {
-					Assert.fail("Plan has no correct hash join or merge join strategies.");
-				}
-			} else {
-				Assert.fail("Plan has neither correct BC join or partitioned join configuration.");
-			}
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail(e.getMessage());
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  Checks for special conditions
-	// ------------------------------------------------------------------------
-	
-	private void checkStandardStrategies(SingleInputPlanNode map, DualInputPlanNode join, SingleInputPlanNode combiner,
-			SingleInputPlanNode reducer, SinkPlanNode sink)
-	{
-		// check ship strategies that are always fix
-		Assert.assertEquals(ShipStrategyType.FORWARD, map.getInput().getShipStrategy());
-		Assert.assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
-		
-		// check the driver strategies that are always fix
-		Assert.assertEquals(DriverStrategy.COLLECTOR_MAP, map.getDriverStrategy());
-		Assert.assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reducer.getDriverStrategy());
-		Assert.assertEquals(DriverStrategy.NONE, sink.getDriverStrategy());
-		if (combiner != null) {
-			Assert.assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combiner.getDriverStrategy());
-			Assert.assertEquals(LocalStrategy.NONE, combiner.getInput().getLocalStrategy());
-		}
-	}
-	
-	private boolean checkBroadcastShipStrategies(DualInputPlanNode join, SingleInputPlanNode reducer,
-			SingleInputPlanNode combiner)
-	{
-		if (ShipStrategyType.BROADCAST == join.getInput1().getShipStrategy() &&
-			ShipStrategyType.FORWARD == join.getInput2().getShipStrategy() &&
-			ShipStrategyType.PARTITION_HASH == reducer.getInput().getShipStrategy())
-		{
-			// check combiner
-			Assert.assertNotNull("Plan should have a combiner", combiner);
-			Assert.assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
-			return true;
-		} else {
-			return false;
-		}
-	}
-	
-	private boolean checkRepartitionShipStrategies(DualInputPlanNode join, SingleInputPlanNode reducer,
-			SingleInputPlanNode combiner)
-	{
-		if (ShipStrategyType.PARTITION_HASH == join.getInput1().getShipStrategy() &&
-			ShipStrategyType.PARTITION_HASH == join.getInput2().getShipStrategy() &&
-			ShipStrategyType.FORWARD == reducer.getInput().getShipStrategy())
-		{
-			// check combiner
-			Assert.assertNull("Plan should not have a combiner", combiner);
-			return true;
-		} else {
-			return false;
-		}
-	}
-	
-	private boolean checkHashJoinStrategies(DualInputPlanNode join, SingleInputPlanNode reducer, boolean buildFirst) {
-		if ( (buildFirst && DriverStrategy.HYBRIDHASH_BUILD_FIRST == join.getDriverStrategy()) ||
-			 (!buildFirst && DriverStrategy.HYBRIDHASH_BUILD_SECOND == join.getDriverStrategy()) ) 
-		{
-			// driver keys
-			Assert.assertEquals(set0, join.getKeysForInput1());
-			Assert.assertEquals(set0, join.getKeysForInput2());
-			
-			// local strategies
-			Assert.assertEquals(LocalStrategy.NONE, join.getInput1().getLocalStrategy());
-			Assert.assertEquals(LocalStrategy.NONE, join.getInput2().getLocalStrategy());
-			Assert.assertEquals(LocalStrategy.COMBININGSORT, reducer.getInput().getLocalStrategy());
-			
-			// local strategy keys
-			Assert.assertEquals(set01, reducer.getInput().getLocalStrategyKeys());
-			Assert.assertEquals(set01, reducer.getKeys(0));
-			Assert.assertTrue(Arrays.equals(reducer.getInput().getLocalStrategySortOrder(), reducer.getSortOrders(0)));
-			return true;
-		} else {
-			return false;
-		}
-	}
-	
-	private boolean checkBroadcastMergeJoin(DualInputPlanNode join, SingleInputPlanNode reducer) {
-		if (DriverStrategy.MERGE == join.getDriverStrategy()) {
-			// driver keys
-			Assert.assertEquals(set0, join.getKeysForInput1());
-			Assert.assertEquals(set0, join.getKeysForInput2());
-			
-			// local strategies
-			Assert.assertEquals(LocalStrategy.SORT, join.getInput1().getLocalStrategy());
-			Assert.assertEquals(LocalStrategy.SORT, join.getInput2().getLocalStrategy());
-			Assert.assertEquals(LocalStrategy.COMBININGSORT, reducer.getInput().getLocalStrategy());
-			
-			// local strategy keys
-			Assert.assertEquals(set0, join.getInput1().getLocalStrategyKeys());
-			Assert.assertEquals(set0, join.getInput2().getLocalStrategyKeys());
-			Assert.assertTrue(Arrays.equals(join.getInput1().getLocalStrategySortOrder(), join.getInput2().getLocalStrategySortOrder()));
-			Assert.assertEquals(set01, reducer.getInput().getLocalStrategyKeys());
-			Assert.assertEquals(set01, reducer.getKeys(0));
-			Assert.assertTrue(Arrays.equals(reducer.getInput().getLocalStrategySortOrder(), reducer.getSortOrders(0)));
-			return true;
-		} else {
-			return false;
-		}
-	}
-	
-	private boolean checkRepartitionMergeJoin(DualInputPlanNode join, SingleInputPlanNode reducer) {
-		if (DriverStrategy.MERGE == join.getDriverStrategy()) {
-			// driver keys
-			Assert.assertEquals(set0, join.getKeysForInput1());
-			Assert.assertEquals(set0, join.getKeysForInput2());
-			
-			// local strategies
-			Assert.assertEquals(LocalStrategy.SORT, join.getInput1().getLocalStrategy());
-			Assert.assertEquals(LocalStrategy.SORT, join.getInput2().getLocalStrategy());
-			Assert.assertEquals(LocalStrategy.NONE, reducer.getInput().getLocalStrategy());
-			
-			// local strategy keys
-			Assert.assertEquals(set01, join.getInput1().getLocalStrategyKeys());
-			Assert.assertEquals(set0, join.getInput2().getLocalStrategyKeys());
-			Assert.assertTrue(join.getInput1().getLocalStrategySortOrder()[0] == join.getInput2().getLocalStrategySortOrder()[0]);
-			Assert.assertEquals(set01, reducer.getKeys(0));
-			Assert.assertTrue(Arrays.equals(join.getInput1().getLocalStrategySortOrder(), reducer.getSortOrders(0)));
-			return true;
-		} else {
-			return false;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/09fdfda7/flink-tests/src/test/java/org/apache/flink/test/compiler/examples/WordCountCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/compiler/examples/WordCountCompilerTest.java b/flink-tests/src/test/java/org/apache/flink/test/compiler/examples/WordCountCompilerTest.java
deleted file mode 100644
index 6cfef9c..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/compiler/examples/WordCountCompilerTest.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.compiler.examples;
-
-import java.util.Arrays;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.distributions.SimpleDistribution;
-import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.common.operators.Ordering;
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.java.record.io.CsvOutputFormat;
-import org.apache.flink.api.java.record.io.TextInputFormat;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.apache.flink.optimizer.util.CompilerTestBase;
-import org.apache.flink.test.recordJobs.wordcount.WordCount;
-import org.apache.flink.test.recordJobs.wordcount.WordCount.CountWords;
-import org.apache.flink.test.recordJobs.wordcount.WordCount.TokenizeLine;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.StringValue;
-import org.junit.Assert;
-import org.junit.Test;
-
-@SuppressWarnings("deprecation")
-public class WordCountCompilerTest extends CompilerTestBase {
-	
-	/**
-	 * This method tests the simple word count.
-	 */
-	@Test
-	public void testWordCount() {
-		checkWordCount(true);
-		checkWordCount(false);
-	}
-	
-	private void checkWordCount(boolean estimates) {
-		try {
-			WordCount wc = new WordCount();
-			ExecutionConfig ec = new ExecutionConfig();
-			Plan p = wc.getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, OUT_FILE);
-			p.setExecutionConfig(ec);
-
-			OptimizedPlan plan;
-			if (estimates) {
-				FileDataSource source = getContractResolver(p).getNode("Input Lines");
-				setSourceStatistics(source, 1024*1024*1024*1024L, 24f);
-				plan = compileWithStats(p);
-			} else {
-				plan = compileNoStats(p);
-			}
-			
-			// get the optimizer plan nodes
-			OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(plan);
-			SinkPlanNode sink = resolver.getNode("Word Counts");
-			SingleInputPlanNode reducer = resolver.getNode("Count Words");
-			SingleInputPlanNode mapper = resolver.getNode("Tokenize Lines");
-			
-			// verify the strategies
-			Assert.assertEquals(ShipStrategyType.FORWARD, mapper.getInput().getShipStrategy());
-			Assert.assertEquals(ShipStrategyType.PARTITION_HASH, reducer.getInput().getShipStrategy());
-			Assert.assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
-			
-			Channel c = reducer.getInput();
-			Assert.assertEquals(LocalStrategy.COMBININGSORT, c.getLocalStrategy());
-			FieldList l = new FieldList(0);
-			Assert.assertEquals(l, c.getShipStrategyKeys());
-			Assert.assertEquals(l, c.getLocalStrategyKeys());
-			Assert.assertTrue(Arrays.equals(c.getLocalStrategySortOrder(), reducer.getSortOrders(0)));
-			
-			// check the combiner
-			SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getPredecessor();
-			Assert.assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combiner.getDriverStrategy());
-			Assert.assertEquals(l, combiner.getKeys(0));
-			Assert.assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
-			
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail(e.getMessage());
-		}
-	}
-	
-	/**
-	 * This method tests that with word count and a range partitioned sink, the range partitioner is pushed down.
-	 */
-	@Test
-	public void testWordCountWithSortedSink() {
-		checkWordCountWithSortedSink(true);
-		checkWordCountWithSortedSink(false);
-	}
-	
-	private void checkWordCountWithSortedSink(boolean estimates) {
-		try {
-			FileDataSource sourceNode = new FileDataSource(new TextInputFormat(), IN_FILE, "Input Lines");
-			MapOperator mapNode = MapOperator.builder(new TokenizeLine())
-				.input(sourceNode)
-				.name("Tokenize Lines")
-				.build();
-			ReduceOperator reduceNode = ReduceOperator.builder(new CountWords(), StringValue.class, 0)
-				.input(mapNode)
-				.name("Count Words")
-				.build();
-			FileDataSink out = new FileDataSink(new CsvOutputFormat(), OUT_FILE, reduceNode, "Word Counts");
-			CsvOutputFormat.configureRecordFormat(out)
-				.recordDelimiter('\n')
-				.fieldDelimiter(' ')
-				.lenient(true)
-				.field(StringValue.class, 0)
-				.field(IntValue.class, 1);
-			
-			Ordering ordering = new Ordering(0, StringValue.class, Order.DESCENDING);
-			out.setGlobalOrder(ordering, new SimpleDistribution(new StringValue[] {new StringValue("N")}));
-
-			ExecutionConfig ec = new ExecutionConfig();
-			Plan p = new Plan(out, "WordCount Example");
-			p.setDefaultParallelism(DEFAULT_PARALLELISM);
-			p.setExecutionConfig(ec);
-	
-			OptimizedPlan plan;
-			if (estimates) {
-				setSourceStatistics(sourceNode, 1024*1024*1024*1024L, 24f);
-				plan = compileWithStats(p);
-			} else {
-				plan = compileNoStats(p);
-			}
-			
-			OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(plan);
-			SinkPlanNode sink = resolver.getNode("Word Counts");
-			SingleInputPlanNode reducer = resolver.getNode("Count Words");
-			SingleInputPlanNode mapper = resolver.getNode("Tokenize Lines");
-			
-			Assert.assertEquals(ShipStrategyType.FORWARD, mapper.getInput().getShipStrategy());
-			Assert.assertEquals(ShipStrategyType.PARTITION_RANGE, reducer.getInput().getShipStrategy());
-			Assert.assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
-			
-			Channel c = reducer.getInput();
-			Assert.assertEquals(LocalStrategy.COMBININGSORT, c.getLocalStrategy());
-			FieldList l = new FieldList(0);
-			Assert.assertEquals(l, c.getShipStrategyKeys());
-			Assert.assertEquals(l, c.getLocalStrategyKeys());
-			
-			// check that the sort orders are descending
-			Assert.assertFalse(c.getShipStrategySortOrder()[0]);
-			Assert.assertFalse(c.getLocalStrategySortOrder()[0]);
-			
-			// check the combiner
-			SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getPredecessor();
-			Assert.assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combiner.getDriverStrategy());
-			Assert.assertEquals(l, combiner.getKeys(0));
-			Assert.assertEquals(l, combiner.getKeys(1));
-			Assert.assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail(e.getMessage());
-		}
-	}
-	
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/09fdfda7/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/ConnectedComponentsCoGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/ConnectedComponentsCoGroupTest.java b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/ConnectedComponentsCoGroupTest.java
deleted file mode 100644
index 10f2b5c..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/ConnectedComponentsCoGroupTest.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.compiler.iterations;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.optimizer.dag.TempMode;
-import org.apache.flink.optimizer.plan.DualInputPlanNode;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.plan.SourcePlanNode;
-import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
-import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
-import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.apache.flink.optimizer.util.CompilerTestBase;
-import org.apache.flink.test.recordJobs.graph.ConnectedComponentsWithCoGroup;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- *
- */
-public class ConnectedComponentsCoGroupTest extends CompilerTestBase {
-	
-	private static final String VERTEX_SOURCE = "Vertices";
-	
-	private static final String ITERATION_NAME = "Connected Components Iteration";
-	
-	private static final String EDGES_SOURCE = "Edges";
-	private static final String JOIN_NEIGHBORS_MATCH = "Join Candidate Id With Neighbor";
-	private static final String MIN_ID_AND_UPDATE = "Min Id and Update";
-	
-	private static final String SINK = "Result";
-	
-	private static final boolean PRINT_PLAN = false;
-	
-	private final FieldList set0 = new FieldList(0);
-	
-	
-	@Test
-	public void testWorksetConnectedComponents() {
-		ConnectedComponentsWithCoGroup cc = new ConnectedComponentsWithCoGroup();
-
-		Plan plan = cc.getPlan(String.valueOf(DEFAULT_PARALLELISM),
-				IN_FILE, IN_FILE, OUT_FILE, String.valueOf(100));
-		plan.setExecutionConfig(new ExecutionConfig());
-		OptimizedPlan optPlan = compileNoStats(plan);
-		OptimizerPlanNodeResolver or = getOptimizerPlanNodeResolver(optPlan);
-		
-		if (PRINT_PLAN) {
-			PlanJSONDumpGenerator dumper = new PlanJSONDumpGenerator();
-			String json = dumper.getOptimizerPlanAsJSON(optPlan);
-			System.out.println(json);
-		}
-		
-		SourcePlanNode vertexSource = or.getNode(VERTEX_SOURCE);
-		SourcePlanNode edgesSource = or.getNode(EDGES_SOURCE);
-		SinkPlanNode sink = or.getNode(SINK);
-		WorksetIterationPlanNode iter = or.getNode(ITERATION_NAME);
-		
-		DualInputPlanNode neighborsJoin = or.getNode(JOIN_NEIGHBORS_MATCH);
-		DualInputPlanNode cogroup = or.getNode(MIN_ID_AND_UPDATE);
-		
-		// --------------------------------------------------------------------
-		// Plan validation:
-		//
-		// We expect the plan to go with a sort-merge join, because the CoGroup
-		// sorts and the join in the successive iteration can re-exploit the sorting.
-		// --------------------------------------------------------------------
-		
-		// test all drivers
-		Assert.assertEquals(DriverStrategy.NONE, sink.getDriverStrategy());
-		Assert.assertEquals(DriverStrategy.NONE, vertexSource.getDriverStrategy());
-		Assert.assertEquals(DriverStrategy.NONE, edgesSource.getDriverStrategy());
-		
-		Assert.assertEquals(DriverStrategy.MERGE, neighborsJoin.getDriverStrategy());
-		Assert.assertEquals(set0, neighborsJoin.getKeysForInput1());
-		Assert.assertEquals(set0, neighborsJoin.getKeysForInput2());
-		
-		Assert.assertEquals(DriverStrategy.CO_GROUP, cogroup.getDriverStrategy());
-		Assert.assertEquals(set0, cogroup.getKeysForInput1());
-		Assert.assertEquals(set0, cogroup.getKeysForInput2());
-		
-		// test all the shipping strategies
-		Assert.assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
-		Assert.assertEquals(ShipStrategyType.PARTITION_HASH, iter.getInitialSolutionSetInput().getShipStrategy());
-		Assert.assertEquals(set0, iter.getInitialSolutionSetInput().getShipStrategyKeys());
-		Assert.assertEquals(ShipStrategyType.PARTITION_HASH, iter.getInitialWorksetInput().getShipStrategy());
-		Assert.assertEquals(set0, iter.getInitialWorksetInput().getShipStrategyKeys());
-		
-		Assert.assertEquals(ShipStrategyType.FORWARD, neighborsJoin.getInput1().getShipStrategy()); // workset
-		Assert.assertEquals(ShipStrategyType.PARTITION_HASH, neighborsJoin.getInput2().getShipStrategy()); // edges
-		Assert.assertEquals(set0, neighborsJoin.getInput2().getShipStrategyKeys());
-		Assert.assertTrue(neighborsJoin.getInput2().getTempMode().isCached());
-		
-		Assert.assertEquals(ShipStrategyType.PARTITION_HASH, cogroup.getInput1().getShipStrategy()); // min id
-		Assert.assertEquals(ShipStrategyType.FORWARD, cogroup.getInput2().getShipStrategy()); // solution set
-		
-		// test all the local strategies
-		Assert.assertEquals(LocalStrategy.NONE, sink.getInput().getLocalStrategy());
-		Assert.assertEquals(LocalStrategy.NONE, iter.getInitialSolutionSetInput().getLocalStrategy());
-		
-		// the sort for the neighbor join in the first iteration is pushed out of the loop
-		Assert.assertEquals(LocalStrategy.SORT, iter.getInitialWorksetInput().getLocalStrategy());
-		Assert.assertEquals(LocalStrategy.NONE, neighborsJoin.getInput1().getLocalStrategy()); // workset
-		Assert.assertEquals(LocalStrategy.SORT, neighborsJoin.getInput2().getLocalStrategy()); // edges
-		
-		Assert.assertEquals(LocalStrategy.SORT, cogroup.getInput1().getLocalStrategy());
-		Assert.assertEquals(LocalStrategy.NONE, cogroup.getInput2().getLocalStrategy()); // solution set
-		
-		// check the caches
-		Assert.assertTrue(TempMode.CACHED == neighborsJoin.getInput2().getTempMode());
-		
-		JobGraphGenerator jgg = new JobGraphGenerator();
-		jgg.compileJobGraph(optPlan);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/09fdfda7/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/IterativeKMeansTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/IterativeKMeansTest.java b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/IterativeKMeansTest.java
deleted file mode 100644
index bd4b6be..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/IterativeKMeansTest.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.compiler.iterations;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.util.Arrays;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.apache.flink.optimizer.util.CompilerTestBase;
-import org.apache.flink.optimizer.util.OperatorResolver;
-import org.apache.flink.test.recordJobs.kmeans.KMeansBroadcast;
-import org.junit.Assert;
-import org.junit.Test;
-
-@SuppressWarnings("deprecation")
-public class IterativeKMeansTest extends CompilerTestBase {
-	
-	private static final String DATAPOINTS = "Data Points";
-	private static final String CENTERS = "Centers";
-	
-	private static final String MAPPER_NAME = "Find Nearest Centers";
-	private static final String REDUCER_NAME = "Recompute Center Positions";
-	
-	private static final String ITERATION_NAME = "k-means loop";
-	
-	private static final String SINK = "New Center Positions";
-	
-	private final FieldList set0 = new FieldList(0);
-	
-	// --------------------------------------------------------------------------------------------
-	//  K-Means (Bulk Iteration)
-	// --------------------------------------------------------------------------------------------
-	
-	@Test
-	public void testCompileKMeansSingleStepWithStats() {
-		
-		KMeansBroadcast kmi = new KMeansBroadcast();
-		Plan p = kmi.getPlan(String.valueOf(DEFAULT_PARALLELISM), IN_FILE, IN_FILE, OUT_FILE, String.valueOf(20));
-		p.setExecutionConfig(new ExecutionConfig());
-		// set the statistics
-		OperatorResolver cr = getContractResolver(p);
-		FileDataSource pointsSource = cr.getNode(DATAPOINTS);
-		FileDataSource centersSource = cr.getNode(CENTERS);
-		setSourceStatistics(pointsSource, 100l*1024*1024*1024, 32f);
-		setSourceStatistics(centersSource, 1024*1024, 32f);
-		
-		OptimizedPlan plan = compileWithStats(p);
-		checkPlan(plan);
-		
-		new JobGraphGenerator().compileJobGraph(plan);
-	}
-
-	@Test
-	public void testCompileKMeansSingleStepWithOutStats() {
-		
-		KMeansBroadcast kmi = new KMeansBroadcast();
-		Plan p = kmi.getPlan(String.valueOf(DEFAULT_PARALLELISM), IN_FILE, IN_FILE, OUT_FILE, String.valueOf(20));
-		p.setExecutionConfig(new ExecutionConfig());
-		OptimizedPlan plan = compileNoStats(p);
-		checkPlan(plan);
-		
-		new JobGraphGenerator().compileJobGraph(plan);
-	}
-	
-	private void checkPlan(OptimizedPlan plan) {
-		
-		OptimizerPlanNodeResolver or = getOptimizerPlanNodeResolver(plan);
-		
-		final SinkPlanNode sink = or.getNode(SINK);
-		final SingleInputPlanNode reducer = or.getNode(REDUCER_NAME);
-		final SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getPredecessor();
-		final SingleInputPlanNode mapper = or.getNode(MAPPER_NAME);
-		
-		final BulkIterationPlanNode iter = or.getNode(ITERATION_NAME);
-		
-		// -------------------- outside the loop -----------------------
-		
-		// check the sink
-		assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
-		assertEquals(LocalStrategy.NONE, sink.getInput().getLocalStrategy());
-		
-		// check the iteration
-		assertEquals(ShipStrategyType.FORWARD, iter.getInput().getShipStrategy());
-		assertEquals(LocalStrategy.NONE, iter.getInput().getLocalStrategy());
-		
-		
-		// -------------------- inside the loop -----------------------
-		
-		// check the mapper
-		assertEquals(1, mapper.getBroadcastInputs().size());
-		assertEquals(ShipStrategyType.FORWARD, mapper.getInput().getShipStrategy());
-		assertEquals(ShipStrategyType.BROADCAST, mapper.getBroadcastInputs().get(0).getShipStrategy());
-		assertFalse(mapper.getInput().isOnDynamicPath());
-		assertTrue(mapper.getBroadcastInputs().get(0).isOnDynamicPath());
-		assertTrue(mapper.getInput().getTempMode().isCached());
-		
-		assertEquals(LocalStrategy.NONE, mapper.getInput().getLocalStrategy());
-		assertEquals(LocalStrategy.NONE, mapper.getBroadcastInputs().get(0).getLocalStrategy());
-		
-		assertEquals(DriverStrategy.COLLECTOR_MAP, mapper.getDriverStrategy());
-		
-		assertNull(mapper.getInput().getLocalStrategyKeys());
-		assertNull(mapper.getInput().getLocalStrategySortOrder());
-		assertNull(mapper.getBroadcastInputs().get(0).getLocalStrategyKeys());
-		assertNull(mapper.getBroadcastInputs().get(0).getLocalStrategySortOrder());
-		
-		// check the combiner
-		Assert.assertNotNull(combiner);
-		assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
-		assertTrue(combiner.getInput().isOnDynamicPath());
-		
-		assertEquals(LocalStrategy.NONE, combiner.getInput().getLocalStrategy());
-		assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combiner.getDriverStrategy());
-		assertNull(combiner.getInput().getLocalStrategyKeys());
-		assertNull(combiner.getInput().getLocalStrategySortOrder());
-		assertEquals(set0, combiner.getKeys(0));
-		assertEquals(set0, combiner.getKeys(1));
-		
-		// check the reducer
-		assertEquals(ShipStrategyType.PARTITION_HASH, reducer.getInput().getShipStrategy());
-		assertTrue(reducer.getInput().isOnDynamicPath());
-		assertEquals(LocalStrategy.COMBININGSORT, reducer.getInput().getLocalStrategy());
-		assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reducer.getDriverStrategy());
-		assertEquals(set0, reducer.getKeys(0));
-		assertEquals(set0, reducer.getInput().getLocalStrategyKeys());
-		assertTrue(Arrays.equals(reducer.getInput().getLocalStrategySortOrder(), reducer.getSortOrders(0)));
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/09fdfda7/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/MultipleJoinsWithSolutionSetCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/MultipleJoinsWithSolutionSetCompilerTest.java b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/MultipleJoinsWithSolutionSetCompilerTest.java
deleted file mode 100644
index aea448f..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/MultipleJoinsWithSolutionSetCompilerTest.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.compiler.iterations;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.api.common.functions.RichJoinFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.optimizer.util.CompilerTestBase;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.operators.DeltaIteration;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.optimizer.plan.DualInputPlanNode;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.SolutionSetPlanNode;
-import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-
-@SuppressWarnings("serial")
-public class MultipleJoinsWithSolutionSetCompilerTest extends CompilerTestBase {
-		
-	private static final String JOIN_1 = "join1";
-	private static final String JOIN_2 = "join2";
-		
-	@Test
-	public void testMultiSolutionSetJoinPlan() {
-		try {
-			
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			
-			@SuppressWarnings("unchecked")
-			DataSet<Tuple2<Long, Double>> inputData = env.fromElements(new Tuple2<Long, Double>(1L, 1.0));
-			DataSet<Tuple2<Long, Double>> result = constructPlan(inputData, 10);
-			
-			// add two sinks, to test the case of branching after an iteration
-			result.output(new DiscardingOutputFormat<Tuple2<Long, Double>>());
-			result.output(new DiscardingOutputFormat<Tuple2<Long, Double>>());
-		
-			Plan p = env.createProgramPlan();
-			
-			OptimizedPlan optPlan = compileNoStats(p);
-			
-			OptimizerPlanNodeResolver or = getOptimizerPlanNodeResolver(optPlan);
-			
-			DualInputPlanNode join1 = or.getNode(JOIN_1);
-			DualInputPlanNode join2 = or.getNode(JOIN_2);
-			
-			assertEquals(DriverStrategy.HYBRIDHASH_BUILD_FIRST, join1.getDriverStrategy());
-			assertEquals(DriverStrategy.HYBRIDHASH_BUILD_SECOND, join2.getDriverStrategy());
-			
-			assertEquals(ShipStrategyType.PARTITION_HASH, join1.getInput2().getShipStrategy());
-			assertEquals(ShipStrategyType.PARTITION_HASH, join2.getInput1().getShipStrategy());
-			
-			assertEquals(SolutionSetPlanNode.class, join1.getInput1().getSource().getClass());
-			assertEquals(SolutionSetPlanNode.class, join2.getInput2().getSource().getClass());
-			
-			new JobGraphGenerator().compileJobGraph(optPlan);
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			fail("Test erroneous: " + e.getMessage());
-		}
-	}
-	
-	
-	
-	public static DataSet<Tuple2<Long, Double>> constructPlan(DataSet<Tuple2<Long, Double>> initialData, int numIterations) {
-
-		DeltaIteration<Tuple2<Long, Double>, Tuple2<Long, Double>> iteration = initialData.iterateDelta(initialData, numIterations, 0);
-		
-		DataSet<Tuple2<Long, Double>> delta = iteration.getSolutionSet()
-				.join(iteration.getWorkset().flatMap(new Duplicator())).where(0).equalTo(0).with(new SummingJoin()).name(JOIN_1)
-				.groupBy(0).aggregate(Aggregations.MIN, 1).map(new Expander())
-				.join(iteration.getSolutionSet()).where(0).equalTo(0).with(new SummingJoinProject()).name(JOIN_2);
-		
-		DataSet<Tuple2<Long, Double>> changes = delta.groupBy(0).aggregate(Aggregations.SUM, 1);
-		
-		DataSet<Tuple2<Long, Double>> result = iteration.closeWith(delta, changes);
-		
-		return result;
-	}
-	
-	public static final class SummingJoin extends RichJoinFunction<Tuple2<Long, Double>, Tuple2<Long, Double>, Tuple2<Long, Double>> {
-
-		@Override
-		public Tuple2<Long, Double> join(Tuple2<Long, Double> first, Tuple2<Long, Double> second) {
-			return new Tuple2<Long, Double>(first.f0, first.f1 + second.f1);
-		}
-	}
-	
-	public static final class SummingJoinProject extends RichJoinFunction<Tuple3<Long, Double, Double>, Tuple2<Long, Double>, Tuple2<Long, Double>> {
-
-		@Override
-		public Tuple2<Long, Double> join(Tuple3<Long, Double, Double> first, Tuple2<Long, Double> second) {
-			return new Tuple2<Long, Double>(first.f0, first.f1 + first.f2 + second.f1);
-		}
-	}
-	
-	public static final class Duplicator extends RichFlatMapFunction<Tuple2<Long, Double>, Tuple2<Long, Double>> {
-
-		@Override
-		public void flatMap(Tuple2<Long, Double> value, Collector<Tuple2<Long, Double>> out) {
-			out.collect(value);
-			out.collect(value);
-		}
-	}
-	
-	public static final class Expander extends RichMapFunction<Tuple2<Long, Double>, Tuple3<Long, Double, Double>> {
-
-		@Override
-		public Tuple3<Long, Double, Double> map(Tuple2<Long, Double> value) {
-			return new Tuple3<Long, Double, Double>(value.f0, value.f1, value.f1 * 2);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/09fdfda7/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/PageRankCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/PageRankCompilerTest.java b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/PageRankCompilerTest.java
deleted file mode 100644
index a3b7572..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/PageRankCompilerTest.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.compiler.iterations;
-
-import static org.apache.flink.api.java.aggregation.Aggregations.SUM;
-import static org.junit.Assert.fail;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.optimizer.Optimizer;
-import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
-import org.apache.flink.optimizer.plan.BulkPartialSolutionPlanNode;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.examples.java.graph.PageRankBasic.BuildOutgoingEdgeList;
-import org.apache.flink.examples.java.graph.PageRankBasic.Dampener;
-import org.apache.flink.examples.java.graph.PageRankBasic.EpsilonFilter;
-import org.apache.flink.examples.java.graph.PageRankBasic.JoinVertexWithEdgesMatch;
-import org.apache.flink.examples.java.graph.PageRankBasic.RankAssigner;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.apache.flink.optimizer.util.CompilerTestBase;
-import org.junit.Assert;
-import org.junit.Test;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.operators.IterativeDataSet;
-
-public class PageRankCompilerTest extends CompilerTestBase{
-	
-	@Test
-	public void testPageRank() {
-		try {
-			final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			
-			// get input data
-			DataSet<Long> pagesInput = env.fromElements(1l);
-			@SuppressWarnings("unchecked")
-			DataSet<Tuple2<Long, Long>> linksInput =env.fromElements(new Tuple2<Long, Long>(1l, 2l));
-			
-			// assign initial rank to pages
-			DataSet<Tuple2<Long, Double>> pagesWithRanks = pagesInput.
-					map(new RankAssigner((1.0d / 10)));
-			
-			// build adjacency list from link input
-			DataSet<Tuple2<Long, Long[]>> adjacencyListInput = 
-					linksInput.groupBy(0).reduceGroup(new BuildOutgoingEdgeList());
-			
-			// set iterative data set
-			IterativeDataSet<Tuple2<Long, Double>> iteration = pagesWithRanks.iterate(10);
-			
-			Configuration cfg = new Configuration();
-			cfg.setString(Optimizer.HINT_LOCAL_STRATEGY, Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND);
-			
-			DataSet<Tuple2<Long, Double>> newRanks = iteration
-					// join pages with outgoing edges and distribute rank
-					.join(adjacencyListInput).where(0).equalTo(0).withParameters(cfg)
-					.flatMap(new JoinVertexWithEdgesMatch())
-					// collect and sum ranks
-					.groupBy(0).aggregate(SUM, 1)
-					// apply dampening factor
-					.map(new Dampener(0.85, 10));
-			
-			DataSet<Tuple2<Long, Double>> finalPageRanks = iteration.closeWith(
-					newRanks, 
-					newRanks.join(iteration).where(0).equalTo(0)
-					// termination condition
-					.filter(new EpsilonFilter()));
-	
-			finalPageRanks.output(new DiscardingOutputFormat<Tuple2<Long, Double>>());
-	
-			// get the plan and compile it
-			Plan p = env.createProgramPlan();
-			OptimizedPlan op = compileNoStats(p);
-			
-			SinkPlanNode sinkPlanNode = (SinkPlanNode) op.getDataSinks().iterator().next();
-			BulkIterationPlanNode iterPlanNode = (BulkIterationPlanNode) sinkPlanNode.getInput().getSource();
-			
-			// check that the partitioning is pushed out of the first loop
-			Assert.assertEquals(ShipStrategyType.PARTITION_HASH, iterPlanNode.getInput().getShipStrategy());
-			Assert.assertEquals(LocalStrategy.NONE, iterPlanNode.getInput().getLocalStrategy());
-			
-			BulkPartialSolutionPlanNode partSolPlanNode = iterPlanNode.getPartialSolutionPlanNode();
-			Assert.assertEquals(ShipStrategyType.FORWARD, partSolPlanNode.getOutgoingChannels().get(0).getShipStrategy());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/09fdfda7/flink-tests/src/test/java/org/apache/flink/test/compiler/plandump/DumpCompiledPlanTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/compiler/plandump/DumpCompiledPlanTest.java b/flink-tests/src/test/java/org/apache/flink/test/compiler/plandump/DumpCompiledPlanTest.java
deleted file mode 100644
index a981124..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/compiler/plandump/DumpCompiledPlanTest.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.compiler.plandump;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.client.program.Client.ProgramAbortException;
-import org.apache.flink.client.program.PackagedProgram.PreviewPlanEnvironment;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
-import org.apache.flink.examples.java.clustering.KMeans;
-import org.apache.flink.optimizer.util.CompilerTestBase;
-import org.apache.flink.test.recordJobs.graph.DeltaPageRankWithInitialDeltas;
-import org.apache.flink.test.recordJobs.kmeans.KMeansBroadcast;
-import org.apache.flink.test.recordJobs.kmeans.KMeansSingleStep;
-import org.apache.flink.test.recordJobs.relational.TPCHQuery3;
-import org.apache.flink.test.recordJobs.relational.WebLogAnalysis;
-import org.apache.flink.test.recordJobs.wordcount.WordCount;
-import org.codehaus.jackson.JsonFactory;
-import org.codehaus.jackson.JsonParseException;
-import org.codehaus.jackson.JsonParser;
-import org.junit.Assert;
-import org.junit.Test;
-
-/*
- * The tests in this class simply invokes the JSON dump code for the optimized plan.
- */
-public class DumpCompiledPlanTest extends CompilerTestBase {
-	
-	@Test
-	public void dumpWordCount() {
-		dump(new WordCount().getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, OUT_FILE));
-	}
-	
-	@Test
-	public void dumpTPCH3() {
-		dump(new TPCHQuery3().getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, IN_FILE, OUT_FILE));
-	}
-	
-	@Test
-	public void dumpKMeans() {
-		dump(new KMeansSingleStep().getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, IN_FILE, OUT_FILE));
-	}
-	
-	@Test
-	public void dumpIterativeKMeans() {
-		// prepare the test environment
-		PreviewPlanEnvironment env = new PreviewPlanEnvironment();
-		env.setAsContext();
-		try {
-			// <points path> <centers path> <result path> <num iterations
-			KMeans.main(new String[] {IN_FILE, IN_FILE, OUT_FILE, "123"});
-		} catch(ProgramAbortException pae) {
-			// all good.
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("KMeans failed with an exception");
-		}
-		dump(env.getPlan());
-	}
-	
-	@Test
-	public void dumpWebLogAnalysis() {
-		dump(new WebLogAnalysis().getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, IN_FILE, IN_FILE, OUT_FILE));
-	}
-
-	@Test
-	public void dumpBulkIterationKMeans() {
-		dump(new KMeansBroadcast().getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, OUT_FILE));
-	}
-	
-	@Test
-	public void dumpDeltaPageRank() {
-		dump(new DeltaPageRankWithInitialDeltas().getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, IN_FILE, IN_FILE, OUT_FILE, "10"));
-	}
-	
-	private void dump(Plan p) {
-		p.setExecutionConfig(new ExecutionConfig());
-		try {
-			OptimizedPlan op = compileNoStats(p);
-			PlanJSONDumpGenerator dumper = new PlanJSONDumpGenerator();
-			String json = dumper.getOptimizerPlanAsJSON(op);
-			JsonParser parser = new JsonFactory().createJsonParser(json);
-			while (parser.nextToken() != null);
-		} catch (JsonParseException e) {
-			e.printStackTrace();
-			Assert.fail("JSON Generator produced malformatted output: " + e.getMessage());
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("An error occurred in the test: " + e.getMessage());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/09fdfda7/flink-tests/src/test/java/org/apache/flink/test/compiler/plandump/PreviewPlanDumpTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/compiler/plandump/PreviewPlanDumpTest.java b/flink-tests/src/test/java/org/apache/flink/test/compiler/plandump/PreviewPlanDumpTest.java
deleted file mode 100644
index b348333..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/compiler/plandump/PreviewPlanDumpTest.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.compiler.plandump;
-
-import java.util.List;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.optimizer.Optimizer;
-import org.apache.flink.optimizer.dag.DataSinkNode;
-import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
-import org.apache.flink.test.recordJobs.graph.DeltaPageRankWithInitialDeltas;
-import org.apache.flink.test.recordJobs.kmeans.KMeansBroadcast;
-import org.apache.flink.test.recordJobs.kmeans.KMeansSingleStep;
-import org.apache.flink.test.recordJobs.relational.TPCHQuery3;
-import org.apache.flink.test.recordJobs.relational.WebLogAnalysis;
-import org.apache.flink.test.recordJobs.wordcount.WordCount;
-import org.apache.flink.util.OperatingSystem;
-import org.codehaus.jackson.JsonFactory;
-import org.codehaus.jackson.JsonParseException;
-import org.codehaus.jackson.JsonParser;
-import org.junit.Assert;
-import org.junit.Test;
-
-/*
- * The tests in this class simply invokes the JSON dump code for the original plan.
- */
-public class PreviewPlanDumpTest {
-	
-	protected static final String IN_FILE = OperatingSystem.isWindows() ?  "file:/c:/test/file" : "file:///test/file";
-	
-	protected static final String OUT_FILE = OperatingSystem.isWindows() ?  "file:/c:/test/output" : "file:///test/output";
-	
-	protected static final String[] NO_ARGS = new String[0];
-
-	@Test
-	public void dumpWordCount() {
-		dump(new WordCount().getPlan("4", IN_FILE, OUT_FILE));
-		
-		// The web interface passes empty string-args to compute the preview of the
-		// job, so we should test this situation too
-		dump(new WordCount().getPlan(NO_ARGS));
-	}
-	
-	@Test
-	public void dumpTPCH3() {
-		dump(new TPCHQuery3().getPlan("4", IN_FILE, IN_FILE, OUT_FILE));
-		dump(new TPCHQuery3().getPlan(NO_ARGS));
-	}
-	
-	@Test
-	public void dumpKMeans() {
-		dump(new KMeansSingleStep().getPlan("4", IN_FILE, IN_FILE, OUT_FILE));
-		dump(new KMeansSingleStep().getPlan(NO_ARGS));
-	}
-	
-	@Test
-	public void dumpWebLogAnalysis() {
-		dump(new WebLogAnalysis().getPlan("4", IN_FILE, IN_FILE, IN_FILE, OUT_FILE));
-		dump(new WebLogAnalysis().getPlan(NO_ARGS));
-	}
-	
-	@Test
-	public void dumpBulkIterationKMeans() {
-		dump(new KMeansBroadcast().getPlan("4", IN_FILE, OUT_FILE));
-		dump(new KMeansBroadcast().getPlan(NO_ARGS));
-	}
-	
-	@Test
-	public void dumpDeltaPageRank() {
-		dump(new DeltaPageRankWithInitialDeltas().getPlan("4", IN_FILE, IN_FILE, IN_FILE, OUT_FILE, "10"));
-		dump(new DeltaPageRankWithInitialDeltas().getPlan(NO_ARGS));
-	}
-	
-	private void dump(Plan p) {
-		try {
-			List<DataSinkNode> sinks = Optimizer.createPreOptimizedPlan(p);
-			PlanJSONDumpGenerator dumper = new PlanJSONDumpGenerator();
-			String json = dumper.getPactPlanAsJSON(sinks);
-			JsonParser parser = new JsonFactory().createJsonParser(json);
-			while (parser.nextToken() != null);
-		} catch (JsonParseException e) {
-			e.printStackTrace();
-			Assert.fail("JSON Generator produced malformatted output: " + e.getMessage());
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("An error occurred in the test: " + e.getMessage());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/09fdfda7/flink-tests/src/test/java/org/apache/flink/test/iterative/MultipleSolutionSetJoinsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/MultipleSolutionSetJoinsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/MultipleSolutionSetJoinsITCase.java
index 8eea9f3..e6e91f6 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/MultipleSolutionSetJoinsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/MultipleSolutionSetJoinsITCase.java
@@ -26,7 +26,7 @@ import org.junit.Assert;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
-import org.apache.flink.test.compiler.iterations.MultipleJoinsWithSolutionSetCompilerTest;
+import org.apache.flink.test.optimizer.iterations.MultipleJoinsWithSolutionSetCompilerTest;
 import org.apache.flink.test.util.JavaProgramTestBase;
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/09fdfda7/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java b/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java
new file mode 100644
index 0000000..ab8ff45
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.optimizer.examples;
+
+import static org.junit.Assert.*;
+
+import java.util.Arrays;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.java.record.operators.FileDataSource;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.runtime.operators.util.LocalStrategy;
+import org.apache.flink.optimizer.util.CompilerTestBase;
+import org.apache.flink.optimizer.util.OperatorResolver;
+import org.apache.flink.test.recordJobs.kmeans.KMeansSingleStep;
+import org.junit.Assert;
+import org.junit.Test;
+
+@SuppressWarnings("deprecation")
+public class KMeansSingleStepTest extends CompilerTestBase {
+	
+	private static final String DATAPOINTS = "Data Points";
+	private static final String CENTERS = "Centers";
+	
+	private static final String MAPPER_NAME = "Find Nearest Centers";
+	private static final String REDUCER_NAME = "Recompute Center Positions";
+	
+	private static final String SINK = "New Center Positions";
+	
+	private final FieldList set0 = new FieldList(0);
+	
+	
+	@Test
+	public void testCompileKMeansSingleStepWithStats() {
+		
+		KMeansSingleStep kmi = new KMeansSingleStep();
+		Plan p = kmi.getPlan(String.valueOf(DEFAULT_PARALLELISM), IN_FILE, IN_FILE, OUT_FILE, String.valueOf(20));
+		p.setExecutionConfig(new ExecutionConfig());
+		// set the statistics
+		OperatorResolver cr = getContractResolver(p);
+		FileDataSource pointsSource = cr.getNode(DATAPOINTS);
+		FileDataSource centersSource = cr.getNode(CENTERS);
+		setSourceStatistics(pointsSource, 100l*1024*1024*1024, 32f);
+		setSourceStatistics(centersSource, 1024*1024, 32f);
+		
+		OptimizedPlan plan = compileWithStats(p);
+		checkPlan(plan);
+	}
+
+	@Test
+	public void testCompileKMeansSingleStepWithOutStats() {
+		
+		KMeansSingleStep kmi = new KMeansSingleStep();
+		Plan p = kmi.getPlan(String.valueOf(DEFAULT_PARALLELISM), IN_FILE, IN_FILE, OUT_FILE, String.valueOf(20));
+		p.setExecutionConfig(new ExecutionConfig());
+		OptimizedPlan plan = compileNoStats(p);
+		checkPlan(plan);
+	}
+	
+	
+	private void checkPlan(OptimizedPlan plan) {
+		
+		OptimizerPlanNodeResolver or = getOptimizerPlanNodeResolver(plan);
+		
+		final SinkPlanNode sink = or.getNode(SINK);
+		final SingleInputPlanNode reducer = or.getNode(REDUCER_NAME);
+		final SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getPredecessor();
+		final SingleInputPlanNode mapper = or.getNode(MAPPER_NAME);
+		
+		// check the mapper
+		assertEquals(1, mapper.getBroadcastInputs().size());
+		assertEquals(ShipStrategyType.FORWARD, mapper.getInput().getShipStrategy());
+		assertEquals(ShipStrategyType.BROADCAST, mapper.getBroadcastInputs().get(0).getShipStrategy());
+		
+		assertEquals(LocalStrategy.NONE, mapper.getInput().getLocalStrategy());
+		assertEquals(LocalStrategy.NONE, mapper.getBroadcastInputs().get(0).getLocalStrategy());
+		
+		assertEquals(DriverStrategy.COLLECTOR_MAP, mapper.getDriverStrategy());
+		
+		assertNull(mapper.getInput().getLocalStrategyKeys());
+		assertNull(mapper.getInput().getLocalStrategySortOrder());
+		assertNull(mapper.getBroadcastInputs().get(0).getLocalStrategyKeys());
+		assertNull(mapper.getBroadcastInputs().get(0).getLocalStrategySortOrder());
+		
+		
+		// check the combiner
+		Assert.assertNotNull(combiner);
+		assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
+		assertEquals(LocalStrategy.NONE, combiner.getInput().getLocalStrategy());
+		assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combiner.getDriverStrategy());
+		assertNull(combiner.getInput().getLocalStrategyKeys());
+		assertNull(combiner.getInput().getLocalStrategySortOrder());
+		assertEquals(set0, combiner.getKeys(0));
+		assertEquals(set0, combiner.getKeys(1));
+		
+		// check the reducer
+		assertEquals(ShipStrategyType.PARTITION_HASH, reducer.getInput().getShipStrategy());
+		assertEquals(LocalStrategy.COMBININGSORT, reducer.getInput().getLocalStrategy());
+		assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reducer.getDriverStrategy());
+		assertEquals(set0, reducer.getKeys(0));
+		assertEquals(set0, reducer.getInput().getLocalStrategyKeys());
+		assertTrue(Arrays.equals(reducer.getInput().getLocalStrategySortOrder(), reducer.getSortOrders(0)));
+		
+		// check the sink
+		assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
+		assertEquals(LocalStrategy.NONE, sink.getInput().getLocalStrategy());
+	}
+}