You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/18 16:38:18 UTC

[4/7] flink git commit: [tests] Rename 'compiler' tests to 'optimizer' tests for consistent naming

http://git-wip-us.apache.org/repos/asf/flink/blob/09fdfda7/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java b/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java
new file mode 100644
index 0000000..f58486b
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.optimizer.examples;
+
+import java.util.Arrays;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.java.record.operators.FileDataSource;
+import org.apache.flink.api.java.record.operators.JoinOperator;
+import org.apache.flink.api.java.record.operators.MapOperator;
+import org.apache.flink.optimizer.plan.DualInputPlanNode;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.runtime.operators.util.LocalStrategy;
+import org.apache.flink.optimizer.util.CompilerTestBase;
+import org.apache.flink.optimizer.util.OperatorResolver;
+import org.apache.flink.test.recordJobs.relational.TPCHQuery3;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests TPCH Q3 (simplified) under various input conditions.
+ */
+@SuppressWarnings("deprecation")
+public class RelationalQueryCompilerTest extends CompilerTestBase {
+	
+	private static final String ORDERS = "Orders";
+	private static final String LINEITEM = "LineItems";
+	private static final String MAPPER_NAME = "FilterO";
+	private static final String JOIN_NAME = "JoinLiO";
+	
+	private final FieldList set0 = new FieldList(0);
+	private final FieldList set01 = new FieldList(new int[] {0,1});
+	private final ExecutionConfig defaultExecutionConfig = new ExecutionConfig();
+	
+	// ------------------------------------------------------------------------
+	
+	
+	/**
+	 * Verifies that a robust repartitioning plan with a hash join is created in the absence of statistics.
+	 */
+	@Test
+	public void testQueryNoStatistics() {
+		try {
+			TPCHQuery3 query = new TPCHQuery3();
+			Plan p = query.getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, IN_FILE, OUT_FILE);
+			p.setExecutionConfig(defaultExecutionConfig);
+			// compile
+			final OptimizedPlan plan = compileNoStats(p);
+			
+			final OptimizerPlanNodeResolver or = getOptimizerPlanNodeResolver(plan);
+			
+			// get the nodes from the final plan
+			final SinkPlanNode sink = or.getNode("Output");
+			final SingleInputPlanNode reducer = or.getNode("AggLio");
+			final SingleInputPlanNode combiner = reducer.getPredecessor() instanceof SingleInputPlanNode ?
+					(SingleInputPlanNode) reducer.getPredecessor() : null;
+			final DualInputPlanNode join = or.getNode("JoinLiO");
+			final SingleInputPlanNode filteringMapper = or.getNode("FilterO");
+			
+			// verify the optimizer choices
+			checkStandardStrategies(filteringMapper, join, combiner, reducer, sink);
+			Assert.assertTrue(checkRepartitionShipStrategies(join, reducer, combiner));
+			Assert.assertTrue(checkHashJoinStrategies(join, reducer, true) || checkHashJoinStrategies(join, reducer, false));
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail(e.getMessage());
+		}
+	}
+	
+	/**
+	 * Checks if any valid plan is produced. Hash joins are expected to build the orders side, as the statistics
+	 * indicate this to be the smaller one.
+	 */
+	@Test
+	public void testQueryAnyValidPlan() {
+		testQueryGeneric(1024*1024*1024L, 8*1024*1024*1024L, 0.05f, true, true, true, false, true);
+	}
+	
+	/**
+	 * Verifies that the plan compiles in the presence of empty size=0 estimates.
+	 */
+	@Test
+	public void testQueryWithSizeZeroInputs() {
+		testQueryGeneric(0, 0, 0.5f, true, true, true, false, true);
+	}
+	
+	/**
+	 * Statistics that push towards a broadcast join.
+	 */
+	@Test
+	public void testQueryWithStatsForBroadcastHash() {
+		testQueryGeneric(1024l*1024*1024*1024, 1024l*1024*1024*1024, 0.05f, true, false, true, false, false);
+	}
+	
+	/**
+	 * Statistics that push towards a broadcast join.
+	 */
+	@Test
+	public void testQueryWithStatsForRepartitionAny() {
+		testQueryGeneric(100l*1024*1024*1024*1024, 100l*1024*1024*1024*1024, 0.5f, false, true, true, true, true);
+	}
+	
+	/**
+	 * Statistics that push towards a repartition merge join. If the join blows the data volume up significantly,
+	 * re-exploiting the sorted order is cheaper.
+	 */
+	@Test
+	public void testQueryWithStatsForRepartitionMerge() {
+		TPCHQuery3 query = new TPCHQuery3();
+		Plan p = query.getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, IN_FILE, OUT_FILE);
+		p.setExecutionConfig(defaultExecutionConfig);
+		// set compiler hints
+		OperatorResolver cr = getContractResolver(p);
+		JoinOperator match = cr.getNode("JoinLiO");
+		match.getCompilerHints().setFilterFactor(100f);
+		
+		testQueryGeneric(100l*1024*1024*1024*1024, 100l*1024*1024*1024*1024, 0.05f, 100f, false, true, false, false, true);
+	}
+	
+	// ------------------------------------------------------------------------
+	
+	private void testQueryGeneric(long orderSize, long lineItemSize, 
+			float ordersFilterFactor, 
+			boolean broadcastOkay, boolean partitionedOkay,
+			boolean hashJoinFirstOkay, boolean hashJoinSecondOkay, boolean mergeJoinOkay)
+	{
+		testQueryGeneric(orderSize, lineItemSize, ordersFilterFactor, ordersFilterFactor, broadcastOkay, partitionedOkay, hashJoinFirstOkay, hashJoinSecondOkay, mergeJoinOkay);
+	}
+	
+	private void testQueryGeneric(long orderSize, long lineItemSize, 
+			float ordersFilterFactor, float joinFilterFactor,
+			boolean broadcastOkay, boolean partitionedOkay,
+			boolean hashJoinFirstOkay, boolean hashJoinSecondOkay, boolean mergeJoinOkay)
+	{
+		TPCHQuery3 query = new TPCHQuery3();
+		Plan p = query.getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, IN_FILE, OUT_FILE);
+		p.setExecutionConfig(defaultExecutionConfig);
+		testQueryGeneric(p, orderSize, lineItemSize, ordersFilterFactor, joinFilterFactor, broadcastOkay, partitionedOkay, hashJoinFirstOkay, hashJoinSecondOkay, mergeJoinOkay);
+	}
+		
+	private void testQueryGeneric(Plan p, long orderSize, long lineitemSize, 
+			float orderSelectivity, float joinSelectivity, 
+			boolean broadcastOkay, boolean partitionedOkay,
+			boolean hashJoinFirstOkay, boolean hashJoinSecondOkay, boolean mergeJoinOkay)
+	{
+		try {
+			// set statistics
+			OperatorResolver cr = getContractResolver(p);
+			FileDataSource ordersSource = cr.getNode(ORDERS);
+			FileDataSource lineItemSource = cr.getNode(LINEITEM);
+			MapOperator mapper = cr.getNode(MAPPER_NAME);
+			JoinOperator joiner = cr.getNode(JOIN_NAME);
+			setSourceStatistics(ordersSource, orderSize, 100f);
+			setSourceStatistics(lineItemSource, lineitemSize, 140f);
+			mapper.getCompilerHints().setAvgOutputRecordSize(16f);
+			mapper.getCompilerHints().setFilterFactor(orderSelectivity);
+			joiner.getCompilerHints().setFilterFactor(joinSelectivity);
+			
+			// compile
+			final OptimizedPlan plan = compileWithStats(p);
+			final OptimizerPlanNodeResolver or = getOptimizerPlanNodeResolver(plan);
+			
+			// get the nodes from the final plan
+			final SinkPlanNode sink = or.getNode("Output");
+			final SingleInputPlanNode reducer = or.getNode("AggLio");
+			final SingleInputPlanNode combiner = reducer.getPredecessor() instanceof SingleInputPlanNode ?
+					(SingleInputPlanNode) reducer.getPredecessor() : null;
+			final DualInputPlanNode join = or.getNode("JoinLiO");
+			final SingleInputPlanNode filteringMapper = or.getNode("FilterO");
+			
+			checkStandardStrategies(filteringMapper, join, combiner, reducer, sink);
+			
+			// check the possible variants and that the variant ia allowed in this specific setting
+			if (checkBroadcastShipStrategies(join, reducer, combiner)) {
+				Assert.assertTrue("Broadcast join incorrectly chosen.", broadcastOkay);
+				
+				if (checkHashJoinStrategies(join, reducer, true)) {
+					Assert.assertTrue("Hash join (build orders) incorrectly chosen", hashJoinFirstOkay);
+				} else if (checkHashJoinStrategies(join, reducer, false)) {
+					Assert.assertTrue("Hash join (build lineitem) incorrectly chosen", hashJoinSecondOkay);
+				} else if (checkBroadcastMergeJoin(join, reducer)) {
+					Assert.assertTrue("Merge join incorrectly chosen", mergeJoinOkay);
+				} else {
+					Assert.fail("Plan has no correct hash join or merge join strategies.");
+				}
+			}
+			else if (checkRepartitionShipStrategies(join, reducer, combiner)) {
+				Assert.assertTrue("Partitioned join incorrectly chosen.", partitionedOkay);
+				
+				if (checkHashJoinStrategies(join, reducer, true)) {
+					Assert.assertTrue("Hash join (build orders) incorrectly chosen", hashJoinFirstOkay);
+				} else if (checkHashJoinStrategies(join, reducer, false)) {
+					Assert.assertTrue("Hash join (build lineitem) incorrectly chosen", hashJoinSecondOkay);
+				} else if (checkRepartitionMergeJoin(join, reducer)) {
+					Assert.assertTrue("Merge join incorrectly chosen", mergeJoinOkay);
+				} else {
+					Assert.fail("Plan has no correct hash join or merge join strategies.");
+				}
+			} else {
+				Assert.fail("Plan has neither correct BC join or partitioned join configuration.");
+			}
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail(e.getMessage());
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Checks for special conditions
+	// ------------------------------------------------------------------------
+	
+	private void checkStandardStrategies(SingleInputPlanNode map, DualInputPlanNode join, SingleInputPlanNode combiner,
+			SingleInputPlanNode reducer, SinkPlanNode sink)
+	{
+		// check ship strategies that are always fix
+		Assert.assertEquals(ShipStrategyType.FORWARD, map.getInput().getShipStrategy());
+		Assert.assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
+		
+		// check the driver strategies that are always fix
+		Assert.assertEquals(DriverStrategy.COLLECTOR_MAP, map.getDriverStrategy());
+		Assert.assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reducer.getDriverStrategy());
+		Assert.assertEquals(DriverStrategy.NONE, sink.getDriverStrategy());
+		if (combiner != null) {
+			Assert.assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combiner.getDriverStrategy());
+			Assert.assertEquals(LocalStrategy.NONE, combiner.getInput().getLocalStrategy());
+		}
+	}
+	
+	private boolean checkBroadcastShipStrategies(DualInputPlanNode join, SingleInputPlanNode reducer,
+			SingleInputPlanNode combiner)
+	{
+		if (ShipStrategyType.BROADCAST == join.getInput1().getShipStrategy() &&
+			ShipStrategyType.FORWARD == join.getInput2().getShipStrategy() &&
+			ShipStrategyType.PARTITION_HASH == reducer.getInput().getShipStrategy())
+		{
+			// check combiner
+			Assert.assertNotNull("Plan should have a combiner", combiner);
+			Assert.assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
+			return true;
+		} else {
+			return false;
+		}
+	}
+	
+	private boolean checkRepartitionShipStrategies(DualInputPlanNode join, SingleInputPlanNode reducer,
+			SingleInputPlanNode combiner)
+	{
+		if (ShipStrategyType.PARTITION_HASH == join.getInput1().getShipStrategy() &&
+			ShipStrategyType.PARTITION_HASH == join.getInput2().getShipStrategy() &&
+			ShipStrategyType.FORWARD == reducer.getInput().getShipStrategy())
+		{
+			// check combiner
+			Assert.assertNull("Plan should not have a combiner", combiner);
+			return true;
+		} else {
+			return false;
+		}
+	}
+	
+	private boolean checkHashJoinStrategies(DualInputPlanNode join, SingleInputPlanNode reducer, boolean buildFirst) {
+		if ( (buildFirst && DriverStrategy.HYBRIDHASH_BUILD_FIRST == join.getDriverStrategy()) ||
+			 (!buildFirst && DriverStrategy.HYBRIDHASH_BUILD_SECOND == join.getDriverStrategy()) ) 
+		{
+			// driver keys
+			Assert.assertEquals(set0, join.getKeysForInput1());
+			Assert.assertEquals(set0, join.getKeysForInput2());
+			
+			// local strategies
+			Assert.assertEquals(LocalStrategy.NONE, join.getInput1().getLocalStrategy());
+			Assert.assertEquals(LocalStrategy.NONE, join.getInput2().getLocalStrategy());
+			Assert.assertEquals(LocalStrategy.COMBININGSORT, reducer.getInput().getLocalStrategy());
+			
+			// local strategy keys
+			Assert.assertEquals(set01, reducer.getInput().getLocalStrategyKeys());
+			Assert.assertEquals(set01, reducer.getKeys(0));
+			Assert.assertTrue(Arrays.equals(reducer.getInput().getLocalStrategySortOrder(), reducer.getSortOrders(0)));
+			return true;
+		} else {
+			return false;
+		}
+	}
+	
+	private boolean checkBroadcastMergeJoin(DualInputPlanNode join, SingleInputPlanNode reducer) {
+		if (DriverStrategy.MERGE == join.getDriverStrategy()) {
+			// driver keys
+			Assert.assertEquals(set0, join.getKeysForInput1());
+			Assert.assertEquals(set0, join.getKeysForInput2());
+			
+			// local strategies
+			Assert.assertEquals(LocalStrategy.SORT, join.getInput1().getLocalStrategy());
+			Assert.assertEquals(LocalStrategy.SORT, join.getInput2().getLocalStrategy());
+			Assert.assertEquals(LocalStrategy.COMBININGSORT, reducer.getInput().getLocalStrategy());
+			
+			// local strategy keys
+			Assert.assertEquals(set0, join.getInput1().getLocalStrategyKeys());
+			Assert.assertEquals(set0, join.getInput2().getLocalStrategyKeys());
+			Assert.assertTrue(Arrays.equals(join.getInput1().getLocalStrategySortOrder(), join.getInput2().getLocalStrategySortOrder()));
+			Assert.assertEquals(set01, reducer.getInput().getLocalStrategyKeys());
+			Assert.assertEquals(set01, reducer.getKeys(0));
+			Assert.assertTrue(Arrays.equals(reducer.getInput().getLocalStrategySortOrder(), reducer.getSortOrders(0)));
+			return true;
+		} else {
+			return false;
+		}
+	}
+	
+	private boolean checkRepartitionMergeJoin(DualInputPlanNode join, SingleInputPlanNode reducer) {
+		if (DriverStrategy.MERGE == join.getDriverStrategy()) {
+			// driver keys
+			Assert.assertEquals(set0, join.getKeysForInput1());
+			Assert.assertEquals(set0, join.getKeysForInput2());
+			
+			// local strategies
+			Assert.assertEquals(LocalStrategy.SORT, join.getInput1().getLocalStrategy());
+			Assert.assertEquals(LocalStrategy.SORT, join.getInput2().getLocalStrategy());
+			Assert.assertEquals(LocalStrategy.NONE, reducer.getInput().getLocalStrategy());
+			
+			// local strategy keys
+			Assert.assertEquals(set01, join.getInput1().getLocalStrategyKeys());
+			Assert.assertEquals(set0, join.getInput2().getLocalStrategyKeys());
+			Assert.assertTrue(join.getInput1().getLocalStrategySortOrder()[0] == join.getInput2().getLocalStrategySortOrder()[0]);
+			Assert.assertEquals(set01, reducer.getKeys(0));
+			Assert.assertTrue(Arrays.equals(join.getInput1().getLocalStrategySortOrder(), reducer.getSortOrders(0)));
+			return true;
+		} else {
+			return false;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/09fdfda7/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/WordCountCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/WordCountCompilerTest.java b/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/WordCountCompilerTest.java
new file mode 100644
index 0000000..ce71383
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/WordCountCompilerTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.optimizer.examples;
+
+import java.util.Arrays;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.distributions.SimpleDistribution;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.java.record.io.CsvOutputFormat;
+import org.apache.flink.api.java.record.io.TextInputFormat;
+import org.apache.flink.api.java.record.operators.FileDataSink;
+import org.apache.flink.api.java.record.operators.FileDataSource;
+import org.apache.flink.api.java.record.operators.MapOperator;
+import org.apache.flink.api.java.record.operators.ReduceOperator;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.runtime.operators.util.LocalStrategy;
+import org.apache.flink.optimizer.util.CompilerTestBase;
+import org.apache.flink.test.recordJobs.wordcount.WordCount;
+import org.apache.flink.test.recordJobs.wordcount.WordCount.CountWords;
+import org.apache.flink.test.recordJobs.wordcount.WordCount.TokenizeLine;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.StringValue;
+import org.junit.Assert;
+import org.junit.Test;
+
+@SuppressWarnings("deprecation")
+public class WordCountCompilerTest extends CompilerTestBase {
+	
+	/**
+	 * This method tests the simple word count.
+	 */
+	@Test
+	public void testWordCount() {
+		checkWordCount(true);
+		checkWordCount(false);
+	}
+	
+	private void checkWordCount(boolean estimates) {
+		try {
+			WordCount wc = new WordCount();
+			ExecutionConfig ec = new ExecutionConfig();
+			Plan p = wc.getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, OUT_FILE);
+			p.setExecutionConfig(ec);
+
+			OptimizedPlan plan;
+			if (estimates) {
+				FileDataSource source = getContractResolver(p).getNode("Input Lines");
+				setSourceStatistics(source, 1024*1024*1024*1024L, 24f);
+				plan = compileWithStats(p);
+			} else {
+				plan = compileNoStats(p);
+			}
+			
+			// get the optimizer plan nodes
+			OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(plan);
+			SinkPlanNode sink = resolver.getNode("Word Counts");
+			SingleInputPlanNode reducer = resolver.getNode("Count Words");
+			SingleInputPlanNode mapper = resolver.getNode("Tokenize Lines");
+			
+			// verify the strategies
+			Assert.assertEquals(ShipStrategyType.FORWARD, mapper.getInput().getShipStrategy());
+			Assert.assertEquals(ShipStrategyType.PARTITION_HASH, reducer.getInput().getShipStrategy());
+			Assert.assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
+			
+			Channel c = reducer.getInput();
+			Assert.assertEquals(LocalStrategy.COMBININGSORT, c.getLocalStrategy());
+			FieldList l = new FieldList(0);
+			Assert.assertEquals(l, c.getShipStrategyKeys());
+			Assert.assertEquals(l, c.getLocalStrategyKeys());
+			Assert.assertTrue(Arrays.equals(c.getLocalStrategySortOrder(), reducer.getSortOrders(0)));
+			
+			// check the combiner
+			SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getPredecessor();
+			Assert.assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combiner.getDriverStrategy());
+			Assert.assertEquals(l, combiner.getKeys(0));
+			Assert.assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
+			
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail(e.getMessage());
+		}
+	}
+	
+	/**
+	 * This method tests that with word count and a range partitioned sink, the range partitioner is pushed down.
+	 */
+	@Test
+	public void testWordCountWithSortedSink() {
+		checkWordCountWithSortedSink(true);
+		checkWordCountWithSortedSink(false);
+	}
+	
+	private void checkWordCountWithSortedSink(boolean estimates) {
+		try {
+			FileDataSource sourceNode = new FileDataSource(new TextInputFormat(), IN_FILE, "Input Lines");
+			MapOperator mapNode = MapOperator.builder(new TokenizeLine())
+				.input(sourceNode)
+				.name("Tokenize Lines")
+				.build();
+			ReduceOperator reduceNode = ReduceOperator.builder(new CountWords(), StringValue.class, 0)
+				.input(mapNode)
+				.name("Count Words")
+				.build();
+			FileDataSink out = new FileDataSink(new CsvOutputFormat(), OUT_FILE, reduceNode, "Word Counts");
+			CsvOutputFormat.configureRecordFormat(out)
+				.recordDelimiter('\n')
+				.fieldDelimiter(' ')
+				.lenient(true)
+				.field(StringValue.class, 0)
+				.field(IntValue.class, 1);
+			
+			Ordering ordering = new Ordering(0, StringValue.class, Order.DESCENDING);
+			out.setGlobalOrder(ordering, new SimpleDistribution(new StringValue[] {new StringValue("N")}));
+
+			ExecutionConfig ec = new ExecutionConfig();
+			Plan p = new Plan(out, "WordCount Example");
+			p.setDefaultParallelism(DEFAULT_PARALLELISM);
+			p.setExecutionConfig(ec);
+	
+			OptimizedPlan plan;
+			if (estimates) {
+				setSourceStatistics(sourceNode, 1024*1024*1024*1024L, 24f);
+				plan = compileWithStats(p);
+			} else {
+				plan = compileNoStats(p);
+			}
+			
+			OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(plan);
+			SinkPlanNode sink = resolver.getNode("Word Counts");
+			SingleInputPlanNode reducer = resolver.getNode("Count Words");
+			SingleInputPlanNode mapper = resolver.getNode("Tokenize Lines");
+			
+			Assert.assertEquals(ShipStrategyType.FORWARD, mapper.getInput().getShipStrategy());
+			Assert.assertEquals(ShipStrategyType.PARTITION_RANGE, reducer.getInput().getShipStrategy());
+			Assert.assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
+			
+			Channel c = reducer.getInput();
+			Assert.assertEquals(LocalStrategy.COMBININGSORT, c.getLocalStrategy());
+			FieldList l = new FieldList(0);
+			Assert.assertEquals(l, c.getShipStrategyKeys());
+			Assert.assertEquals(l, c.getLocalStrategyKeys());
+			
+			// check that the sort orders are descending
+			Assert.assertFalse(c.getShipStrategySortOrder()[0]);
+			Assert.assertFalse(c.getLocalStrategySortOrder()[0]);
+			
+			// check the combiner
+			SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getPredecessor();
+			Assert.assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combiner.getDriverStrategy());
+			Assert.assertEquals(l, combiner.getKeys(0));
+			Assert.assertEquals(l, combiner.getKeys(1));
+			Assert.assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail(e.getMessage());
+		}
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/09fdfda7/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java b/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java
new file mode 100644
index 0000000..de5fde0
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.optimizer.iterations;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.optimizer.dag.TempMode;
+import org.apache.flink.optimizer.plan.DualInputPlanNode;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.optimizer.plan.SourcePlanNode;
+import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
+import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.runtime.operators.util.LocalStrategy;
+import org.apache.flink.optimizer.util.CompilerTestBase;
+import org.apache.flink.test.recordJobs.graph.ConnectedComponentsWithCoGroup;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class ConnectedComponentsCoGroupTest extends CompilerTestBase {
+	
+	private static final String VERTEX_SOURCE = "Vertices";
+	
+	private static final String ITERATION_NAME = "Connected Components Iteration";
+	
+	private static final String EDGES_SOURCE = "Edges";
+	private static final String JOIN_NEIGHBORS_MATCH = "Join Candidate Id With Neighbor";
+	private static final String MIN_ID_AND_UPDATE = "Min Id and Update";
+	
+	private static final String SINK = "Result";
+	
+	private static final boolean PRINT_PLAN = false;
+	
+	private final FieldList set0 = new FieldList(0);
+	
+	
+	@Test
+	public void testWorksetConnectedComponents() {
+		ConnectedComponentsWithCoGroup cc = new ConnectedComponentsWithCoGroup();
+
+		Plan plan = cc.getPlan(String.valueOf(DEFAULT_PARALLELISM),
+				IN_FILE, IN_FILE, OUT_FILE, String.valueOf(100));
+		plan.setExecutionConfig(new ExecutionConfig());
+		OptimizedPlan optPlan = compileNoStats(plan);
+		OptimizerPlanNodeResolver or = getOptimizerPlanNodeResolver(optPlan);
+		
+		if (PRINT_PLAN) {
+			PlanJSONDumpGenerator dumper = new PlanJSONDumpGenerator();
+			String json = dumper.getOptimizerPlanAsJSON(optPlan);
+			System.out.println(json);
+		}
+		
+		SourcePlanNode vertexSource = or.getNode(VERTEX_SOURCE);
+		SourcePlanNode edgesSource = or.getNode(EDGES_SOURCE);
+		SinkPlanNode sink = or.getNode(SINK);
+		WorksetIterationPlanNode iter = or.getNode(ITERATION_NAME);
+		
+		DualInputPlanNode neighborsJoin = or.getNode(JOIN_NEIGHBORS_MATCH);
+		DualInputPlanNode cogroup = or.getNode(MIN_ID_AND_UPDATE);
+		
+		// --------------------------------------------------------------------
+		// Plan validation:
+		//
+		// We expect the plan to go with a sort-merge join, because the CoGroup
+		// sorts and the join in the successive iteration can re-exploit the sorting.
+		// --------------------------------------------------------------------
+		
+		// test all drivers
+		Assert.assertEquals(DriverStrategy.NONE, sink.getDriverStrategy());
+		Assert.assertEquals(DriverStrategy.NONE, vertexSource.getDriverStrategy());
+		Assert.assertEquals(DriverStrategy.NONE, edgesSource.getDriverStrategy());
+		
+		Assert.assertEquals(DriverStrategy.MERGE, neighborsJoin.getDriverStrategy());
+		Assert.assertEquals(set0, neighborsJoin.getKeysForInput1());
+		Assert.assertEquals(set0, neighborsJoin.getKeysForInput2());
+		
+		Assert.assertEquals(DriverStrategy.CO_GROUP, cogroup.getDriverStrategy());
+		Assert.assertEquals(set0, cogroup.getKeysForInput1());
+		Assert.assertEquals(set0, cogroup.getKeysForInput2());
+		
+		// test all the shipping strategies
+		Assert.assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
+		Assert.assertEquals(ShipStrategyType.PARTITION_HASH, iter.getInitialSolutionSetInput().getShipStrategy());
+		Assert.assertEquals(set0, iter.getInitialSolutionSetInput().getShipStrategyKeys());
+		Assert.assertEquals(ShipStrategyType.PARTITION_HASH, iter.getInitialWorksetInput().getShipStrategy());
+		Assert.assertEquals(set0, iter.getInitialWorksetInput().getShipStrategyKeys());
+		
+		Assert.assertEquals(ShipStrategyType.FORWARD, neighborsJoin.getInput1().getShipStrategy()); // workset
+		Assert.assertEquals(ShipStrategyType.PARTITION_HASH, neighborsJoin.getInput2().getShipStrategy()); // edges
+		Assert.assertEquals(set0, neighborsJoin.getInput2().getShipStrategyKeys());
+		Assert.assertTrue(neighborsJoin.getInput2().getTempMode().isCached());
+		
+		Assert.assertEquals(ShipStrategyType.PARTITION_HASH, cogroup.getInput1().getShipStrategy()); // min id
+		Assert.assertEquals(ShipStrategyType.FORWARD, cogroup.getInput2().getShipStrategy()); // solution set
+		
+		// test all the local strategies
+		Assert.assertEquals(LocalStrategy.NONE, sink.getInput().getLocalStrategy());
+		Assert.assertEquals(LocalStrategy.NONE, iter.getInitialSolutionSetInput().getLocalStrategy());
+		
+		// the sort for the neighbor join in the first iteration is pushed out of the loop
+		Assert.assertEquals(LocalStrategy.SORT, iter.getInitialWorksetInput().getLocalStrategy());
+		Assert.assertEquals(LocalStrategy.NONE, neighborsJoin.getInput1().getLocalStrategy()); // workset
+		Assert.assertEquals(LocalStrategy.SORT, neighborsJoin.getInput2().getLocalStrategy()); // edges
+		
+		Assert.assertEquals(LocalStrategy.SORT, cogroup.getInput1().getLocalStrategy());
+		Assert.assertEquals(LocalStrategy.NONE, cogroup.getInput2().getLocalStrategy()); // solution set
+		
+		// check the caches
+		Assert.assertTrue(TempMode.CACHED == neighborsJoin.getInput2().getTempMode());
+		
+		JobGraphGenerator jgg = new JobGraphGenerator();
+		jgg.compileJobGraph(optPlan);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/09fdfda7/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/IterativeKMeansTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/IterativeKMeansTest.java b/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/IterativeKMeansTest.java
new file mode 100644
index 0000000..3785270
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/IterativeKMeansTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.optimizer.iterations;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.java.record.operators.FileDataSource;
+import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.runtime.operators.util.LocalStrategy;
+import org.apache.flink.optimizer.util.CompilerTestBase;
+import org.apache.flink.optimizer.util.OperatorResolver;
+import org.apache.flink.test.recordJobs.kmeans.KMeansBroadcast;
+import org.junit.Assert;
+import org.junit.Test;
+
+@SuppressWarnings("deprecation")
+public class IterativeKMeansTest extends CompilerTestBase {
+	
+	private static final String DATAPOINTS = "Data Points";
+	private static final String CENTERS = "Centers";
+	
+	private static final String MAPPER_NAME = "Find Nearest Centers";
+	private static final String REDUCER_NAME = "Recompute Center Positions";
+	
+	private static final String ITERATION_NAME = "k-means loop";
+	
+	private static final String SINK = "New Center Positions";
+	
+	private final FieldList set0 = new FieldList(0);
+	
+	// --------------------------------------------------------------------------------------------
+	//  K-Means (Bulk Iteration)
+	// --------------------------------------------------------------------------------------------
+	
+	@Test
+	public void testCompileKMeansSingleStepWithStats() {
+		
+		KMeansBroadcast kmi = new KMeansBroadcast();
+		Plan p = kmi.getPlan(String.valueOf(DEFAULT_PARALLELISM), IN_FILE, IN_FILE, OUT_FILE, String.valueOf(20));
+		p.setExecutionConfig(new ExecutionConfig());
+		// set the statistics
+		OperatorResolver cr = getContractResolver(p);
+		FileDataSource pointsSource = cr.getNode(DATAPOINTS);
+		FileDataSource centersSource = cr.getNode(CENTERS);
+		setSourceStatistics(pointsSource, 100l*1024*1024*1024, 32f);
+		setSourceStatistics(centersSource, 1024*1024, 32f);
+		
+		OptimizedPlan plan = compileWithStats(p);
+		checkPlan(plan);
+		
+		new JobGraphGenerator().compileJobGraph(plan);
+	}
+
+	@Test
+	public void testCompileKMeansSingleStepWithOutStats() {
+		
+		KMeansBroadcast kmi = new KMeansBroadcast();
+		Plan p = kmi.getPlan(String.valueOf(DEFAULT_PARALLELISM), IN_FILE, IN_FILE, OUT_FILE, String.valueOf(20));
+		p.setExecutionConfig(new ExecutionConfig());
+		OptimizedPlan plan = compileNoStats(p);
+		checkPlan(plan);
+		
+		new JobGraphGenerator().compileJobGraph(plan);
+	}
+	
+	private void checkPlan(OptimizedPlan plan) {
+		
+		OptimizerPlanNodeResolver or = getOptimizerPlanNodeResolver(plan);
+		
+		final SinkPlanNode sink = or.getNode(SINK);
+		final SingleInputPlanNode reducer = or.getNode(REDUCER_NAME);
+		final SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getPredecessor();
+		final SingleInputPlanNode mapper = or.getNode(MAPPER_NAME);
+		
+		final BulkIterationPlanNode iter = or.getNode(ITERATION_NAME);
+		
+		// -------------------- outside the loop -----------------------
+		
+		// check the sink
+		assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
+		assertEquals(LocalStrategy.NONE, sink.getInput().getLocalStrategy());
+		
+		// check the iteration
+		assertEquals(ShipStrategyType.FORWARD, iter.getInput().getShipStrategy());
+		assertEquals(LocalStrategy.NONE, iter.getInput().getLocalStrategy());
+		
+		
+		// -------------------- inside the loop -----------------------
+		
+		// check the mapper
+		assertEquals(1, mapper.getBroadcastInputs().size());
+		assertEquals(ShipStrategyType.FORWARD, mapper.getInput().getShipStrategy());
+		assertEquals(ShipStrategyType.BROADCAST, mapper.getBroadcastInputs().get(0).getShipStrategy());
+		assertFalse(mapper.getInput().isOnDynamicPath());
+		assertTrue(mapper.getBroadcastInputs().get(0).isOnDynamicPath());
+		assertTrue(mapper.getInput().getTempMode().isCached());
+		
+		assertEquals(LocalStrategy.NONE, mapper.getInput().getLocalStrategy());
+		assertEquals(LocalStrategy.NONE, mapper.getBroadcastInputs().get(0).getLocalStrategy());
+		
+		assertEquals(DriverStrategy.COLLECTOR_MAP, mapper.getDriverStrategy());
+		
+		assertNull(mapper.getInput().getLocalStrategyKeys());
+		assertNull(mapper.getInput().getLocalStrategySortOrder());
+		assertNull(mapper.getBroadcastInputs().get(0).getLocalStrategyKeys());
+		assertNull(mapper.getBroadcastInputs().get(0).getLocalStrategySortOrder());
+		
+		// check the combiner
+		Assert.assertNotNull(combiner);
+		assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
+		assertTrue(combiner.getInput().isOnDynamicPath());
+		
+		assertEquals(LocalStrategy.NONE, combiner.getInput().getLocalStrategy());
+		assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combiner.getDriverStrategy());
+		assertNull(combiner.getInput().getLocalStrategyKeys());
+		assertNull(combiner.getInput().getLocalStrategySortOrder());
+		assertEquals(set0, combiner.getKeys(0));
+		assertEquals(set0, combiner.getKeys(1));
+		
+		// check the reducer
+		assertEquals(ShipStrategyType.PARTITION_HASH, reducer.getInput().getShipStrategy());
+		assertTrue(reducer.getInput().isOnDynamicPath());
+		assertEquals(LocalStrategy.COMBININGSORT, reducer.getInput().getLocalStrategy());
+		assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reducer.getDriverStrategy());
+		assertEquals(set0, reducer.getKeys(0));
+		assertEquals(set0, reducer.getInput().getLocalStrategyKeys());
+		assertTrue(Arrays.equals(reducer.getInput().getLocalStrategySortOrder(), reducer.getSortOrders(0)));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/09fdfda7/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/MultipleJoinsWithSolutionSetCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/MultipleJoinsWithSolutionSetCompilerTest.java b/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/MultipleJoinsWithSolutionSetCompilerTest.java
new file mode 100644
index 0000000..f17b28a
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/MultipleJoinsWithSolutionSetCompilerTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.optimizer.iterations;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.aggregation.Aggregations;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RichJoinFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.optimizer.util.CompilerTestBase;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.optimizer.plan.DualInputPlanNode;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.SolutionSetPlanNode;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+
+@SuppressWarnings("serial")
+public class MultipleJoinsWithSolutionSetCompilerTest extends CompilerTestBase {
+		
+	private static final String JOIN_1 = "join1";
+	private static final String JOIN_2 = "join2";
+		
+	@Test
+	public void testMultiSolutionSetJoinPlan() {
+		try {
+			
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			@SuppressWarnings("unchecked")
+			DataSet<Tuple2<Long, Double>> inputData = env.fromElements(new Tuple2<Long, Double>(1L, 1.0));
+			DataSet<Tuple2<Long, Double>> result = constructPlan(inputData, 10);
+			
+			// add two sinks, to test the case of branching after an iteration
+			result.output(new DiscardingOutputFormat<Tuple2<Long, Double>>());
+			result.output(new DiscardingOutputFormat<Tuple2<Long, Double>>());
+		
+			Plan p = env.createProgramPlan();
+			
+			OptimizedPlan optPlan = compileNoStats(p);
+			
+			OptimizerPlanNodeResolver or = getOptimizerPlanNodeResolver(optPlan);
+			
+			DualInputPlanNode join1 = or.getNode(JOIN_1);
+			DualInputPlanNode join2 = or.getNode(JOIN_2);
+			
+			assertEquals(DriverStrategy.HYBRIDHASH_BUILD_FIRST, join1.getDriverStrategy());
+			assertEquals(DriverStrategy.HYBRIDHASH_BUILD_SECOND, join2.getDriverStrategy());
+			
+			assertEquals(ShipStrategyType.PARTITION_HASH, join1.getInput2().getShipStrategy());
+			assertEquals(ShipStrategyType.PARTITION_HASH, join2.getInput1().getShipStrategy());
+			
+			assertEquals(SolutionSetPlanNode.class, join1.getInput1().getSource().getClass());
+			assertEquals(SolutionSetPlanNode.class, join2.getInput2().getSource().getClass());
+			
+			new JobGraphGenerator().compileJobGraph(optPlan);
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail("Test erroneous: " + e.getMessage());
+		}
+	}
+	
+	
+	
+	public static DataSet<Tuple2<Long, Double>> constructPlan(DataSet<Tuple2<Long, Double>> initialData, int numIterations) {
+
+		DeltaIteration<Tuple2<Long, Double>, Tuple2<Long, Double>> iteration = initialData.iterateDelta(initialData, numIterations, 0);
+		
+		DataSet<Tuple2<Long, Double>> delta = iteration.getSolutionSet()
+				.join(iteration.getWorkset().flatMap(new Duplicator())).where(0).equalTo(0).with(new SummingJoin()).name(JOIN_1)
+				.groupBy(0).aggregate(Aggregations.MIN, 1).map(new Expander())
+				.join(iteration.getSolutionSet()).where(0).equalTo(0).with(new SummingJoinProject()).name(JOIN_2);
+		
+		DataSet<Tuple2<Long, Double>> changes = delta.groupBy(0).aggregate(Aggregations.SUM, 1);
+		
+		DataSet<Tuple2<Long, Double>> result = iteration.closeWith(delta, changes);
+		
+		return result;
+	}
+	
+	public static final class SummingJoin extends RichJoinFunction<Tuple2<Long, Double>, Tuple2<Long, Double>, Tuple2<Long, Double>> {
+
+		@Override
+		public Tuple2<Long, Double> join(Tuple2<Long, Double> first, Tuple2<Long, Double> second) {
+			return new Tuple2<Long, Double>(first.f0, first.f1 + second.f1);
+		}
+	}
+	
+	public static final class SummingJoinProject extends RichJoinFunction<Tuple3<Long, Double, Double>, Tuple2<Long, Double>, Tuple2<Long, Double>> {
+
+		@Override
+		public Tuple2<Long, Double> join(Tuple3<Long, Double, Double> first, Tuple2<Long, Double> second) {
+			return new Tuple2<Long, Double>(first.f0, first.f1 + first.f2 + second.f1);
+		}
+	}
+	
+	public static final class Duplicator extends RichFlatMapFunction<Tuple2<Long, Double>, Tuple2<Long, Double>> {
+
+		@Override
+		public void flatMap(Tuple2<Long, Double> value, Collector<Tuple2<Long, Double>> out) {
+			out.collect(value);
+			out.collect(value);
+		}
+	}
+	
+	public static final class Expander extends RichMapFunction<Tuple2<Long, Double>, Tuple3<Long, Double, Double>> {
+
+		@Override
+		public Tuple3<Long, Double, Double> map(Tuple2<Long, Double> value) {
+			return new Tuple3<Long, Double, Double>(value.f0, value.f1, value.f1 * 2);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/09fdfda7/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/PageRankCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/PageRankCompilerTest.java b/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/PageRankCompilerTest.java
new file mode 100644
index 0000000..9b17270
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/PageRankCompilerTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.test.optimizer.iterations;
+
+import static org.apache.flink.api.java.aggregation.Aggregations.SUM;
+import static org.junit.Assert.fail;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
+import org.apache.flink.optimizer.plan.BulkPartialSolutionPlanNode;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.examples.java.graph.PageRankBasic.BuildOutgoingEdgeList;
+import org.apache.flink.examples.java.graph.PageRankBasic.Dampener;
+import org.apache.flink.examples.java.graph.PageRankBasic.EpsilonFilter;
+import org.apache.flink.examples.java.graph.PageRankBasic.JoinVertexWithEdgesMatch;
+import org.apache.flink.examples.java.graph.PageRankBasic.RankAssigner;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.runtime.operators.util.LocalStrategy;
+import org.apache.flink.optimizer.util.CompilerTestBase;
+import org.junit.Assert;
+import org.junit.Test;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.IterativeDataSet;
+
+public class PageRankCompilerTest extends CompilerTestBase{
+	
+	@Test
+	public void testPageRank() {
+		try {
+			final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			// get input data
+			DataSet<Long> pagesInput = env.fromElements(1l);
+			@SuppressWarnings("unchecked")
+			DataSet<Tuple2<Long, Long>> linksInput =env.fromElements(new Tuple2<Long, Long>(1l, 2l));
+			
+			// assign initial rank to pages
+			DataSet<Tuple2<Long, Double>> pagesWithRanks = pagesInput.
+					map(new RankAssigner((1.0d / 10)));
+			
+			// build adjacency list from link input
+			DataSet<Tuple2<Long, Long[]>> adjacencyListInput = 
+					linksInput.groupBy(0).reduceGroup(new BuildOutgoingEdgeList());
+			
+			// set iterative data set
+			IterativeDataSet<Tuple2<Long, Double>> iteration = pagesWithRanks.iterate(10);
+			
+			Configuration cfg = new Configuration();
+			cfg.setString(Optimizer.HINT_LOCAL_STRATEGY, Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND);
+			
+			DataSet<Tuple2<Long, Double>> newRanks = iteration
+					// join pages with outgoing edges and distribute rank
+					.join(adjacencyListInput).where(0).equalTo(0).withParameters(cfg)
+					.flatMap(new JoinVertexWithEdgesMatch())
+					// collect and sum ranks
+					.groupBy(0).aggregate(SUM, 1)
+					// apply dampening factor
+					.map(new Dampener(0.85, 10));
+			
+			DataSet<Tuple2<Long, Double>> finalPageRanks = iteration.closeWith(
+					newRanks, 
+					newRanks.join(iteration).where(0).equalTo(0)
+					// termination condition
+					.filter(new EpsilonFilter()));
+	
+			finalPageRanks.output(new DiscardingOutputFormat<Tuple2<Long, Double>>());
+	
+			// get the plan and compile it
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			SinkPlanNode sinkPlanNode = (SinkPlanNode) op.getDataSinks().iterator().next();
+			BulkIterationPlanNode iterPlanNode = (BulkIterationPlanNode) sinkPlanNode.getInput().getSource();
+			
+			// check that the partitioning is pushed out of the first loop
+			Assert.assertEquals(ShipStrategyType.PARTITION_HASH, iterPlanNode.getInput().getShipStrategy());
+			Assert.assertEquals(LocalStrategy.NONE, iterPlanNode.getInput().getLocalStrategy());
+			
+			BulkPartialSolutionPlanNode partSolPlanNode = iterPlanNode.getPartialSolutionPlanNode();
+			Assert.assertEquals(ShipStrategyType.FORWARD, partSolPlanNode.getOutgoingChannels().get(0).getShipStrategy());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/09fdfda7/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java
new file mode 100644
index 0000000..082532e
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.optimizer.jsonplan;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.client.program.Client.ProgramAbortException;
+import org.apache.flink.client.program.PackagedProgram.PreviewPlanEnvironment;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
+import org.apache.flink.examples.java.clustering.KMeans;
+import org.apache.flink.optimizer.util.CompilerTestBase;
+import org.apache.flink.test.recordJobs.graph.DeltaPageRankWithInitialDeltas;
+import org.apache.flink.test.recordJobs.kmeans.KMeansBroadcast;
+import org.apache.flink.test.recordJobs.kmeans.KMeansSingleStep;
+import org.apache.flink.test.recordJobs.relational.TPCHQuery3;
+import org.apache.flink.test.recordJobs.relational.WebLogAnalysis;
+import org.apache.flink.test.recordJobs.wordcount.WordCount;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonParseException;
+import org.codehaus.jackson.JsonParser;
+import org.junit.Assert;
+import org.junit.Test;
+
+/*
+ * The tests in this class simply invokes the JSON dump code for the optimized plan.
+ */
+public class DumpCompiledPlanTest extends CompilerTestBase {
+	
+	@Test
+	public void dumpWordCount() {
+		dump(new WordCount().getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, OUT_FILE));
+	}
+	
+	@Test
+	public void dumpTPCH3() {
+		dump(new TPCHQuery3().getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, IN_FILE, OUT_FILE));
+	}
+	
+	@Test
+	public void dumpKMeans() {
+		dump(new KMeansSingleStep().getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, IN_FILE, OUT_FILE));
+	}
+	
+	@Test
+	public void dumpIterativeKMeans() {
+		// prepare the test environment
+		PreviewPlanEnvironment env = new PreviewPlanEnvironment();
+		env.setAsContext();
+		try {
+			// <points path> <centers path> <result path> <num iterations
+			KMeans.main(new String[] {IN_FILE, IN_FILE, OUT_FILE, "123"});
+		} catch(ProgramAbortException pae) {
+			// all good.
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("KMeans failed with an exception");
+		}
+		dump(env.getPlan());
+	}
+	
+	@Test
+	public void dumpWebLogAnalysis() {
+		dump(new WebLogAnalysis().getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, IN_FILE, IN_FILE, OUT_FILE));
+	}
+
+	@Test
+	public void dumpBulkIterationKMeans() {
+		dump(new KMeansBroadcast().getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, OUT_FILE));
+	}
+	
+	@Test
+	public void dumpDeltaPageRank() {
+		dump(new DeltaPageRankWithInitialDeltas().getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, IN_FILE, IN_FILE, OUT_FILE, "10"));
+	}
+	
+	private void dump(Plan p) {
+		p.setExecutionConfig(new ExecutionConfig());
+		try {
+			OptimizedPlan op = compileNoStats(p);
+			PlanJSONDumpGenerator dumper = new PlanJSONDumpGenerator();
+			String json = dumper.getOptimizerPlanAsJSON(op);
+			JsonParser parser = new JsonFactory().createJsonParser(json);
+			while (parser.nextToken() != null);
+		} catch (JsonParseException e) {
+			e.printStackTrace();
+			Assert.fail("JSON Generator produced malformatted output: " + e.getMessage());
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("An error occurred in the test: " + e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/09fdfda7/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java
new file mode 100644
index 0000000..49fe6d8
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.optimizer.jsonplan;
+
+import java.util.List;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.dag.DataSinkNode;
+import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
+import org.apache.flink.test.recordJobs.graph.DeltaPageRankWithInitialDeltas;
+import org.apache.flink.test.recordJobs.kmeans.KMeansBroadcast;
+import org.apache.flink.test.recordJobs.kmeans.KMeansSingleStep;
+import org.apache.flink.test.recordJobs.relational.TPCHQuery3;
+import org.apache.flink.test.recordJobs.relational.WebLogAnalysis;
+import org.apache.flink.test.recordJobs.wordcount.WordCount;
+import org.apache.flink.util.OperatingSystem;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonParseException;
+import org.codehaus.jackson.JsonParser;
+import org.junit.Assert;
+import org.junit.Test;
+
+/*
+ * The tests in this class simply invokes the JSON dump code for the original plan.
+ */
+public class PreviewPlanDumpTest {
+	
+	protected static final String IN_FILE = OperatingSystem.isWindows() ?  "file:/c:/test/file" : "file:///test/file";
+	
+	protected static final String OUT_FILE = OperatingSystem.isWindows() ?  "file:/c:/test/output" : "file:///test/output";
+	
+	protected static final String[] NO_ARGS = new String[0];
+
+	@Test
+	public void dumpWordCount() {
+		dump(new WordCount().getPlan("4", IN_FILE, OUT_FILE));
+		
+		// The web interface passes empty string-args to compute the preview of the
+		// job, so we should test this situation too
+		dump(new WordCount().getPlan(NO_ARGS));
+	}
+	
+	@Test
+	public void dumpTPCH3() {
+		dump(new TPCHQuery3().getPlan("4", IN_FILE, IN_FILE, OUT_FILE));
+		dump(new TPCHQuery3().getPlan(NO_ARGS));
+	}
+	
+	@Test
+	public void dumpKMeans() {
+		dump(new KMeansSingleStep().getPlan("4", IN_FILE, IN_FILE, OUT_FILE));
+		dump(new KMeansSingleStep().getPlan(NO_ARGS));
+	}
+	
+	@Test
+	public void dumpWebLogAnalysis() {
+		dump(new WebLogAnalysis().getPlan("4", IN_FILE, IN_FILE, IN_FILE, OUT_FILE));
+		dump(new WebLogAnalysis().getPlan(NO_ARGS));
+	}
+	
+	@Test
+	public void dumpBulkIterationKMeans() {
+		dump(new KMeansBroadcast().getPlan("4", IN_FILE, OUT_FILE));
+		dump(new KMeansBroadcast().getPlan(NO_ARGS));
+	}
+	
+	@Test
+	public void dumpDeltaPageRank() {
+		dump(new DeltaPageRankWithInitialDeltas().getPlan("4", IN_FILE, IN_FILE, IN_FILE, OUT_FILE, "10"));
+		dump(new DeltaPageRankWithInitialDeltas().getPlan(NO_ARGS));
+	}
+	
+	private void dump(Plan p) {
+		try {
+			List<DataSinkNode> sinks = Optimizer.createPreOptimizedPlan(p);
+			PlanJSONDumpGenerator dumper = new PlanJSONDumpGenerator();
+			String json = dumper.getPactPlanAsJSON(sinks);
+			JsonParser parser = new JsonFactory().createJsonParser(json);
+			while (parser.nextToken() != null);
+		} catch (JsonParseException e) {
+			e.printStackTrace();
+			Assert.fail("JSON Generator produced malformatted output: " + e.getMessage());
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("An error occurred in the test: " + e.getMessage());
+		}
+	}
+}