You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/20 11:07:06 UTC

[27/53] [abbrv] flink git commit: [optimizer] Rename optimizer project to "flink-optimizer" (previously flink-compiler)

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/FeedbackPropertiesMatchTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/FeedbackPropertiesMatchTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/FeedbackPropertiesMatchTest.java
deleted file mode 100644
index 8c19462..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/FeedbackPropertiesMatchTest.java
+++ /dev/null
@@ -1,1436 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer;
-
-import static org.apache.flink.optimizer.plan.PlanNode.FeedbackPropertiesMeetRequirementsReport.*;
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.operators.BinaryOperatorInformation;
-import org.apache.flink.api.common.operators.GenericDataSourceBase;
-import org.apache.flink.api.common.operators.OperatorInformation;
-import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.common.operators.Ordering;
-import org.apache.flink.api.common.operators.UnaryOperatorInformation;
-import org.apache.flink.api.common.operators.base.JoinOperatorBase;
-import org.apache.flink.api.common.operators.base.MapOperatorBase;
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.common.operators.util.FieldSet;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.optimizer.dag.DataSourceNode;
-import org.apache.flink.optimizer.dag.MapNode;
-import org.apache.flink.optimizer.dag.JoinNode;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.DualInputPlanNode;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.optimizer.plan.SourcePlanNode;
-import org.apache.flink.optimizer.plan.PlanNode.FeedbackPropertiesMeetRequirementsReport;
-import org.apache.flink.optimizer.testfunctions.DummyFlatJoinFunction;
-import org.apache.flink.optimizer.testfunctions.IdentityMapper;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.io.network.DataExchangeMode;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.junit.Test;
-import org.apache.flink.api.java.io.TextInputFormat;
-
-
-public class FeedbackPropertiesMatchTest {
-
-	@Test
-	public void testNoPartialSolutionFoundSingleInputOnly() {
-		try {
-			SourcePlanNode target = new SourcePlanNode(getSourceNode(), "Source");
-			
-			SourcePlanNode otherTarget = new SourcePlanNode(getSourceNode(), "Source");
-			
-			Channel toMap1 = new Channel(target);
-			toMap1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
-			toMap1.setLocalStrategy(LocalStrategy.NONE);
-			SingleInputPlanNode map1 = new SingleInputPlanNode(getMapNode(), "Mapper 1", toMap1, DriverStrategy.MAP);
-			
-			Channel toMap2 = new Channel(map1);
-			toMap2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
-			toMap2.setLocalStrategy(LocalStrategy.NONE);
-			SingleInputPlanNode map2 = new SingleInputPlanNode(getMapNode(), "Mapper 2", toMap2, DriverStrategy.MAP);
-			
-			{
-				GlobalProperties gp = new GlobalProperties();
-				LocalProperties lp = new LocalProperties();
-				
-				FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(otherTarget, gp, lp);
-				assertTrue(report == NO_PARTIAL_SOLUTION);
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testSingleInputOperators() {
-		try {
-			SourcePlanNode target = new SourcePlanNode(getSourceNode(), "Source");
-			
-			Channel toMap1 = new Channel(target);
-			toMap1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
-			toMap1.setLocalStrategy(LocalStrategy.NONE);
-			SingleInputPlanNode map1 = new SingleInputPlanNode(getMapNode(), "Mapper 1", toMap1, DriverStrategy.MAP);
-			
-			Channel toMap2 = new Channel(map1);
-			toMap2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
-			toMap2.setLocalStrategy(LocalStrategy.NONE);
-			SingleInputPlanNode map2 = new SingleInputPlanNode(getMapNode(), "Mapper 2", toMap2, DriverStrategy.MAP);
-			
-			// no feedback properties and none are ever required and present
-			{
-				GlobalProperties gp = new GlobalProperties();
-				LocalProperties lp = new LocalProperties();
-				
-				FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
-				assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
-			}
-			
-			// some global feedback properties and none are ever required and present
-			{
-				GlobalProperties gp = new GlobalProperties();
-				gp.setHashPartitioned(new FieldList(2, 5));
-				LocalProperties lp = new LocalProperties();
-				
-				FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
-				assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
-			}
-			
-			// some local feedback properties and none are ever required and present
-			{
-				GlobalProperties gp = new GlobalProperties();
-				LocalProperties lp = LocalProperties.forGrouping(new FieldList(1, 2));
-				
-				FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
-				assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
-			}
-			
-			// some global and local feedback properties and none are ever required and present
-			{
-				GlobalProperties gp = new GlobalProperties();
-				gp.setHashPartitioned(new FieldList(2, 5));
-				LocalProperties lp = LocalProperties.forGrouping(new FieldList(1, 2));
-				
-				FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
-				assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
-			}
-			
-			// --------------------------- requirements on channel 1 -----------------------
-			
-			// some required global properties, which are matched exactly
-			{
-				GlobalProperties gp = new GlobalProperties();
-				gp.setHashPartitioned(new FieldList(2, 5));
-				LocalProperties lp = new LocalProperties();
-				
-				RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
-				reqGp.setHashPartitioned(new FieldList(2, 5));
-				
-				toMap1.setRequiredGlobalProps(reqGp);
-				toMap1.setRequiredLocalProps(null);
-				
-				toMap2.setRequiredGlobalProps(null);
-				toMap2.setRequiredLocalProps(null);
-				
-				FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
-				assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
-			}
-			
-			// some required local properties, which are matched exactly
-			{
-				GlobalProperties gp = new GlobalProperties();
-				LocalProperties lp = LocalProperties.forGrouping(new FieldList(1, 2));
-				
-				RequestedLocalProperties reqLp = new RequestedLocalProperties();
-				reqLp.setGroupedFields(new FieldList(1, 2));
-				
-				toMap1.setRequiredGlobalProps(null);
-				toMap1.setRequiredLocalProps(reqLp);
-				
-				toMap2.setRequiredGlobalProps(null);
-				toMap2.setRequiredLocalProps(null);
-				
-				FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
-				assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
-			}
-			
-			// some required global and local properties, which are matched exactly
-			{
-				GlobalProperties gp = new GlobalProperties();
-				gp.setHashPartitioned(new FieldList(2, 5));
-				LocalProperties lp = LocalProperties.forGrouping(new FieldList(1, 2));
-				
-				RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
-				reqGp.setHashPartitioned(new FieldList(2, 5));
-				
-				RequestedLocalProperties reqLp = new RequestedLocalProperties();
-				reqLp.setGroupedFields(new FieldList(1, 2));
-				
-				toMap1.setRequiredGlobalProps(reqGp);
-				toMap1.setRequiredLocalProps(reqLp);
-				
-				toMap2.setRequiredGlobalProps(null);
-				toMap2.setRequiredLocalProps(null);
-				
-				FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
-				assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
-			}
-			
-			// some required global and local properties, which are over-fulfilled
-			{
-				GlobalProperties gp = new GlobalProperties();
-				gp.setHashPartitioned(new FieldList(2));
-				LocalProperties lp = LocalProperties.forGrouping(new FieldList(1, 2));
-				
-				RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
-				reqGp.setHashPartitioned(new FieldSet(2, 5));
-				
-				RequestedLocalProperties reqLp = new RequestedLocalProperties();
-				reqLp.setGroupedFields(new FieldList(1));
-				
-				toMap1.setRequiredGlobalProps(reqGp);
-				toMap1.setRequiredLocalProps(reqLp);
-				
-				toMap2.setRequiredGlobalProps(null);
-				toMap2.setRequiredLocalProps(null);
-				
-				FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
-				assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
-			}
-			
-			// some required global properties that are not met
-			{
-				GlobalProperties gp = new GlobalProperties();
-				gp.setHashPartitioned(new FieldList(2, 1));
-				LocalProperties lp = new LocalProperties();
-				
-				RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
-				reqGp.setHashPartitioned(new FieldList(2, 5));
-				
-				toMap1.setRequiredGlobalProps(reqGp);
-				toMap1.setRequiredLocalProps(null);
-				
-				toMap2.setRequiredGlobalProps(null);
-				toMap2.setRequiredLocalProps(null);
-				
-				FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
-				assertEquals(NOT_MET, report);
-			}
-			
-			// some required local properties that are not met
-			{
-				GlobalProperties gp = new GlobalProperties();
-				LocalProperties lp = LocalProperties.forGrouping(new FieldList(1));
-				
-				RequestedLocalProperties reqLp = new RequestedLocalProperties();
-				reqLp.setGroupedFields(new FieldList(2, 1));
-				
-				toMap1.setRequiredGlobalProps(null);
-				toMap1.setRequiredLocalProps(reqLp);
-				
-				toMap2.setRequiredGlobalProps(null);
-				toMap2.setRequiredLocalProps(null);
-				
-				FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
-				assertEquals(NOT_MET, report);
-			}
-			
-			// some required global and local properties where the global properties are not met
-			{
-				GlobalProperties gp = new GlobalProperties();
-				gp.setHashPartitioned(new FieldList(2, 1));
-				LocalProperties lp = LocalProperties.forGrouping(new FieldList(1));
-				
-				RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
-				reqGp.setAnyPartitioning(new FieldList(2, 5));
-				
-				RequestedLocalProperties reqLp = new RequestedLocalProperties();
-				reqLp.setGroupedFields(new FieldList(1));
-				
-				toMap1.setRequiredGlobalProps(reqGp);
-				toMap1.setRequiredLocalProps(reqLp);
-				
-				toMap2.setRequiredGlobalProps(null);
-				toMap2.setRequiredLocalProps(null);
-				
-				FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
-				assertEquals(NOT_MET, report);
-			}
-			
-			// some required global and local properties where the local properties are not met
-			{
-				GlobalProperties gp = new GlobalProperties();
-				gp.setHashPartitioned(new FieldList(1));
-				LocalProperties lp = LocalProperties.forGrouping(new FieldList(1));
-				
-				RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
-				reqGp.setAnyPartitioning(new FieldList(1));
-				
-				RequestedLocalProperties reqLp = new RequestedLocalProperties();
-				reqLp.setGroupedFields(new FieldList(2));
-				
-				toMap1.setRequiredGlobalProps(reqGp);
-				toMap1.setRequiredLocalProps(reqLp);
-				
-				toMap2.setRequiredGlobalProps(null);
-				toMap2.setRequiredLocalProps(null);
-				
-				FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
-				assertEquals(NOT_MET, report);
-			}
-			
-			// --------------------------- requirements on channel 2 -----------------------
-			
-			// some required global properties, which are matched exactly
-			{
-				GlobalProperties gp = new GlobalProperties();
-				gp.setHashPartitioned(new FieldList(2, 5));
-				LocalProperties lp = new LocalProperties();
-				
-				RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
-				reqGp.setHashPartitioned(new FieldList(2, 5));
-				
-				toMap1.setRequiredGlobalProps(null);
-				toMap1.setRequiredLocalProps(null);
-				
-				toMap2.setRequiredGlobalProps(reqGp);
-				toMap2.setRequiredLocalProps(null);
-				
-				FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
-				assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
-			}
-			
-			// some required local properties, which are matched exactly
-			{
-				GlobalProperties gp = new GlobalProperties();
-				LocalProperties lp = LocalProperties.forGrouping(new FieldList(1, 2));
-				
-				RequestedLocalProperties reqLp = new RequestedLocalProperties();
-				reqLp.setGroupedFields(new FieldList(1, 2));
-				
-				toMap1.setRequiredGlobalProps(null);
-				toMap1.setRequiredLocalProps(null);
-				
-				toMap2.setRequiredGlobalProps(null);
-				toMap2.setRequiredLocalProps(reqLp);
-				
-				FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
-				assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
-			}
-			
-			// some required global and local properties, which are matched exactly
-			{
-				GlobalProperties gp = new GlobalProperties();
-				gp.setHashPartitioned(new FieldList(2, 5));
-				LocalProperties lp = LocalProperties.forGrouping(new FieldList(1, 2));
-				
-				RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
-				reqGp.setHashPartitioned(new FieldList(2, 5));
-				
-				RequestedLocalProperties reqLp = new RequestedLocalProperties();
-				reqLp.setGroupedFields(new FieldList(1, 2));
-				
-				toMap1.setRequiredGlobalProps(null);
-				toMap1.setRequiredLocalProps(null);
-				
-				toMap2.setRequiredGlobalProps(reqGp);
-				toMap2.setRequiredLocalProps(reqLp);
-				
-				FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
-				assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
-			}
-			
-			// some required global and local properties, which are over-fulfilled
-			{
-				GlobalProperties gp = new GlobalProperties();
-				gp.setHashPartitioned(new FieldList(2));
-				LocalProperties lp = LocalProperties.forGrouping(new FieldList(1, 2));
-				
-				RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
-				reqGp.setHashPartitioned(new FieldSet(2, 5));
-				
-				RequestedLocalProperties reqLp = new RequestedLocalProperties();
-				reqLp.setGroupedFields(new FieldList(1));
-				
-				toMap1.setRequiredGlobalProps(null);
-				toMap1.setRequiredLocalProps(null);
-				
-				toMap2.setRequiredGlobalProps(reqGp);
-				toMap2.setRequiredLocalProps(reqLp);
-				
-				FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
-				assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
-			}
-			
-			// some required global properties that are not met
-			{
-				GlobalProperties gp = new GlobalProperties();
-				gp.setHashPartitioned(new FieldList(2, 1));
-				LocalProperties lp = new LocalProperties();
-				
-				RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
-				reqGp.setHashPartitioned(new FieldSet(2, 5));
-				
-				toMap1.setRequiredGlobalProps(null);
-				toMap1.setRequiredLocalProps(null);
-				
-				toMap2.setRequiredGlobalProps(reqGp);
-				toMap2.setRequiredLocalProps(null);
-				
-				FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
-				assertEquals(NOT_MET, report);
-			}
-			
-			// some required local properties that are not met
-			{
-				GlobalProperties gp = new GlobalProperties();
-				LocalProperties lp = LocalProperties.forGrouping(new FieldList(1));
-				
-				RequestedLocalProperties reqLp = new RequestedLocalProperties();
-				reqLp.setGroupedFields(new FieldList(2, 1));
-				
-				toMap1.setRequiredGlobalProps(null);
-				toMap1.setRequiredLocalProps(null);
-				
-				toMap2.setRequiredGlobalProps(null);
-				toMap2.setRequiredLocalProps(reqLp);
-				
-				FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
-				assertEquals(NOT_MET, report);
-			}
-			
-			// some required global and local properties where the global properties are not met
-			{
-				GlobalProperties gp = new GlobalProperties();
-				gp.setHashPartitioned(new FieldList(2, 1));
-				LocalProperties lp = LocalProperties.forGrouping(new FieldList(1));
-				
-				RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
-				reqGp.setAnyPartitioning(new FieldSet(2, 5));
-				
-				RequestedLocalProperties reqLp = new RequestedLocalProperties();
-				reqLp.setGroupedFields(new FieldList(1));
-				
-				toMap1.setRequiredGlobalProps(null);
-				toMap1.setRequiredLocalProps(null);
-				
-				toMap2.setRequiredGlobalProps(reqGp);
-				toMap2.setRequiredLocalProps(reqLp);
-				
-				FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
-				assertEquals(NOT_MET, report);
-			}
-			
-			// some required global and local properties where the local properties are not met
-			{
-				GlobalProperties gp = new GlobalProperties();
-				gp.setHashPartitioned(new FieldList(1));
-				LocalProperties lp = LocalProperties.forGrouping(new FieldList(1));
-				
-				RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
-				reqGp.setAnyPartitioning(new FieldList(1));
-				
-				RequestedLocalProperties reqLp = new RequestedLocalProperties();
-				reqLp.setGroupedFields(new FieldList(2));
-				
-				toMap1.setRequiredGlobalProps(null);
-				toMap1.setRequiredLocalProps(null);
-				
-				toMap2.setRequiredGlobalProps(reqGp);
-				toMap2.setRequiredLocalProps(reqLp);
-				
-				FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
-				assertEquals(NOT_MET, report);
-			}
-			
-			// ---------------------- requirements mixed on 1 and 2 -----------------------
-			
-			// some required global properties at step one and some more at step 2
-			{
-				GlobalProperties gp = new GlobalProperties();
-				gp.setHashPartitioned(new FieldList(1, 2));
-				LocalProperties lp = LocalProperties.EMPTY;
-				
-				RequestedGlobalProperties reqGp1 = new RequestedGlobalProperties();
-				reqGp1.setAnyPartitioning(new FieldList(1, 2));
-				
-				RequestedGlobalProperties reqGp2 = new RequestedGlobalProperties();
-				reqGp2.setHashPartitioned(new FieldList(1, 2));
-				
-				toMap1.setRequiredGlobalProps(reqGp1);
-				toMap1.setRequiredLocalProps(null);
-				
-				toMap2.setRequiredGlobalProps(reqGp2);
-				toMap2.setRequiredLocalProps(null);
-				
-				FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
-				assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
-			}
-			
-			// some required local properties at step one and some more at step 2
-			{
-				GlobalProperties gp = new GlobalProperties();
-				LocalProperties lp = LocalProperties.forOrdering(new Ordering(3, null, Order.ASCENDING).appendOrdering(1, null, Order.DESCENDING));
-				
-				RequestedLocalProperties reqLp1 = new RequestedLocalProperties();
-				reqLp1.setGroupedFields(new FieldList(3, 1));
-				
-				RequestedLocalProperties reqLp2 = new RequestedLocalProperties();
-				reqLp2.setOrdering(new Ordering(3, null, Order.ANY).appendOrdering(1, null, Order.ANY));
-				
-				toMap1.setRequiredGlobalProps(null);
-				toMap1.setRequiredLocalProps(reqLp1);
-				
-				toMap2.setRequiredGlobalProps(null);
-				toMap2.setRequiredLocalProps(reqLp2);
-				
-				FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
-				assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
-			}
-			
-			// some required global properties at step one and some local ones at step 2
-			{
-				GlobalProperties gp = new GlobalProperties();
-				gp.setHashPartitioned(new FieldList(1, 2));
-				LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
-				
-				RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
-				reqGp.setAnyPartitioning(new FieldList(1, 2));
-				
-				RequestedLocalProperties reqLp = new RequestedLocalProperties();
-				reqLp.setGroupedFields(new FieldList(2));
-				
-				toMap1.setRequiredGlobalProps(reqGp);
-				toMap1.setRequiredLocalProps(null);
-				
-				toMap2.setRequiredGlobalProps(null);
-				toMap2.setRequiredLocalProps(reqLp);
-				
-				FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
-				assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
-			}
-			
-			// some required local properties at step one and some global ones at step 2
-			{
-				GlobalProperties gp = new GlobalProperties();
-				gp.setHashPartitioned(new FieldList(1, 2));
-				LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
-				
-				RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
-				reqGp.setAnyPartitioning(new FieldList(1, 2));
-				
-				RequestedLocalProperties reqLp = new RequestedLocalProperties();
-				reqLp.setGroupedFields(new FieldList(2));
-				
-				toMap1.setRequiredGlobalProps(null);
-				toMap1.setRequiredLocalProps(reqLp);
-				
-				toMap2.setRequiredGlobalProps(reqGp);
-				toMap2.setRequiredLocalProps(null);
-				
-				FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
-				assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
-			}
-			
-			// some fulfilled global properties at step one and some non-fulfilled local ones at step 2
-			{
-				GlobalProperties gp = new GlobalProperties();
-				gp.setHashPartitioned(new FieldList(1, 2));
-				LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
-				
-				RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
-				reqGp.setAnyPartitioning(new FieldList(1, 2));
-				
-				RequestedLocalProperties reqLp = new RequestedLocalProperties();
-				reqLp.setGroupedFields(new FieldList(2, 3));
-				
-				toMap1.setRequiredGlobalProps(reqGp);
-				toMap1.setRequiredLocalProps(null);
-				
-				toMap2.setRequiredGlobalProps(null);
-				toMap2.setRequiredLocalProps(reqLp);
-				
-				FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
-				assertEquals(NOT_MET, report);
-			}
-			
-			// some fulfilled local properties at step one and some non-fulfilled global ones at step 2
-			{
-				GlobalProperties gp = new GlobalProperties();
-				gp.setHashPartitioned(new FieldList(1, 2));
-				LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
-				
-				RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
-				reqGp.setAnyPartitioning(new FieldList(2, 3));
-				
-				RequestedLocalProperties reqLp = new RequestedLocalProperties();
-				reqLp.setGroupedFields(new FieldList(2, 1));
-				
-				toMap1.setRequiredGlobalProps(null);
-				toMap1.setRequiredLocalProps(reqLp);
-				
-				toMap2.setRequiredGlobalProps(reqGp);
-				toMap2.setRequiredLocalProps(null);
-				
-				FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
-				assertEquals(NOT_MET, report);
-			}
-			
-			// some non-fulfilled global properties at step one and some fulfilled local ones at step 2
-			{
-				GlobalProperties gp = new GlobalProperties();
-				gp.setHashPartitioned(new FieldList(1, 2));
-				LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
-				
-				RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
-				reqGp.setAnyPartitioning(new FieldList(2, 3));
-				
-				RequestedLocalProperties reqLp = new RequestedLocalProperties();
-				reqLp.setGroupedFields(new FieldList(2, 1));
-				
-				toMap1.setRequiredGlobalProps(reqGp);
-				toMap1.setRequiredLocalProps(null);
-				
-				toMap2.setRequiredGlobalProps(null);
-				toMap2.setRequiredLocalProps(reqLp);
-				
-				FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
-				assertEquals(NOT_MET, report);
-			}
-			
-			// some non-fulfilled local properties at step one and some fulfilled global ones at step 2
-			{
-				GlobalProperties gp = new GlobalProperties();
-				gp.setHashPartitioned(new FieldList(1, 2));
-				LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
-				
-				RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
-				reqGp.setAnyPartitioning(new FieldList(1, 2));
-				
-				RequestedLocalProperties reqLp = new RequestedLocalProperties();
-				reqLp.setGroupedFields(new FieldList(2, 1, 3));
-				
-				toMap1.setRequiredGlobalProps(null);
-				toMap1.setRequiredLocalProps(reqLp);
-				
-				toMap2.setRequiredGlobalProps(reqGp);
-				toMap2.setRequiredLocalProps(null);
-				
-				FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
-				assertEquals(NOT_MET, report);
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testSingleInputOperatorsWithReCreation() {
-		try {
-			SourcePlanNode target = new SourcePlanNode(getSourceNode(), "Source");
-			
-			Channel toMap1 = new Channel(target);
-			SingleInputPlanNode map1 = new SingleInputPlanNode(getMapNode(), "Mapper 1", toMap1, DriverStrategy.MAP);
-			
-			Channel toMap2 = new Channel(map1);
-			SingleInputPlanNode map2 = new SingleInputPlanNode(getMapNode(), "Mapper 2", toMap2, DriverStrategy.MAP);
-			
-			// set ship strategy in first channel, so later non matching global properties do not matter
-			{
-				GlobalProperties gp = new GlobalProperties();
-				gp.setHashPartitioned(new FieldList(1, 2));
-				LocalProperties lp = LocalProperties.EMPTY;
-				
-				RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
-				reqGp.setAnyPartitioning(new FieldSet(2, 5));
-				
-				toMap1.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(2, 5), DataExchangeMode.PIPELINED);
-				toMap1.setLocalStrategy(LocalStrategy.NONE);
-				
-				toMap2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
-				toMap2.setLocalStrategy(LocalStrategy.NONE);
-				
-				
-				toMap1.setRequiredGlobalProps(null);
-				toMap1.setRequiredLocalProps(null);
-				
-				toMap2.setRequiredGlobalProps(reqGp);
-				toMap2.setRequiredLocalProps(null);
-				
-				FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
-				assertEquals(MET, report);
-			}
-			
-			// set ship strategy in second channel, so previous non matching global properties void the match
-			{
-				GlobalProperties gp = new GlobalProperties();
-				gp.setHashPartitioned(new FieldList(1, 2));
-				LocalProperties lp = LocalProperties.EMPTY;
-				
-				RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
-				reqGp.setAnyPartitioning(new FieldSet(2, 5));
-				
-				toMap1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
-				toMap1.setLocalStrategy(LocalStrategy.NONE);
-				
-				toMap2.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(2, 5), DataExchangeMode.PIPELINED);
-				toMap2.setLocalStrategy(LocalStrategy.NONE);
-				
-				
-				toMap1.setRequiredGlobalProps(reqGp);
-				toMap1.setRequiredLocalProps(null);
-				
-				toMap2.setRequiredGlobalProps(null);
-				toMap2.setRequiredLocalProps(null);
-				
-				FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
-				assertEquals(NOT_MET, report);
-			}
-			
-			// set local strategy in first channel, so later non matching local properties do not matter
-			{
-				GlobalProperties gp = new GlobalProperties();
-				gp.setHashPartitioned(new FieldList(1, 2));
-				LocalProperties lp = LocalProperties.forOrdering(new Ordering(3, null, Order.ASCENDING).appendOrdering(1, null, Order.DESCENDING));
-				
-				RequestedLocalProperties reqLp = new RequestedLocalProperties();
-				reqLp.setGroupedFields(new FieldList(4, 1));
-				
-				toMap1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
-				toMap1.setLocalStrategy(LocalStrategy.SORT, new FieldList(5, 7), new boolean[] {false, false});
-				
-				toMap2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
-				toMap2.setLocalStrategy(LocalStrategy.NONE);
-				
-				toMap1.setRequiredGlobalProps(null);
-				toMap1.setRequiredLocalProps(null);
-				
-				toMap2.setRequiredGlobalProps(null);
-				toMap2.setRequiredLocalProps(reqLp);
-				
-				FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
-				assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
-			}
-			
-			// set local strategy in second channel, so previous non matching local properties void the match
-			{
-				GlobalProperties gp = new GlobalProperties();
-				gp.setHashPartitioned(new FieldList(1, 2));
-				LocalProperties lp = LocalProperties.forOrdering(new Ordering(3, null, Order.ASCENDING).appendOrdering(1, null, Order.DESCENDING));
-				
-				RequestedLocalProperties reqLp = new RequestedLocalProperties();
-				reqLp.setGroupedFields(new FieldList(4, 1));
-				
-				toMap1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
-				toMap1.setLocalStrategy(LocalStrategy.NONE);
-				
-				toMap2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
-				toMap2.setLocalStrategy(LocalStrategy.SORT, new FieldList(5, 7), new boolean[] {false, false});
-				
-				
-				toMap1.setRequiredGlobalProps(null);
-				toMap1.setRequiredLocalProps(reqLp);
-				
-				toMap2.setRequiredGlobalProps(null);
-				toMap2.setRequiredLocalProps(null);
-				
-				FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
-				assertEquals(NOT_MET, report);
-			}
-			
-			// create the properties on the same node as the requirement
-			{
-				GlobalProperties gp = new GlobalProperties();
-				gp.setHashPartitioned(new FieldList(1, 2));
-				LocalProperties lp = LocalProperties.forOrdering(new Ordering(3, null, Order.ASCENDING).appendOrdering(1, null, Order.DESCENDING));
-				
-				RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
-				reqGp.setAnyPartitioning(new FieldSet(5, 7));
-				
-				RequestedLocalProperties reqLp = new RequestedLocalProperties();
-				reqLp.setGroupedFields(new FieldList(5, 7));
-				
-				toMap1.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(5, 7), DataExchangeMode.PIPELINED);
-				toMap1.setLocalStrategy(LocalStrategy.SORT, new FieldList(5, 7), new boolean[] {false, false});
-				
-				toMap2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
-				toMap2.setLocalStrategy(LocalStrategy.NONE);
-				
-				toMap1.setRequiredGlobalProps(reqGp);
-				toMap1.setRequiredLocalProps(reqLp);
-				
-				toMap2.setRequiredGlobalProps(null);
-				toMap2.setRequiredLocalProps(null);
-				
-				FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
-				assertEquals(MET, report);
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testSingleInputOperatorsChainOfThree() {
-		try {
-			SourcePlanNode target = new SourcePlanNode(getSourceNode(), "Source");
-			
-			Channel toMap1 = new Channel(target);
-			SingleInputPlanNode map1 = new SingleInputPlanNode(getMapNode(), "Mapper 1", toMap1, DriverStrategy.MAP);
-			
-			Channel toMap2 = new Channel(map1);
-			SingleInputPlanNode map2 = new SingleInputPlanNode(getMapNode(), "Mapper 2", toMap2, DriverStrategy.MAP);
-			
-			Channel toMap3 = new Channel(map2);
-			SingleInputPlanNode map3 = new SingleInputPlanNode(getMapNode(), "Mapper 3", toMap3, DriverStrategy.MAP);
-			
-			// set local strategy in first channel, so later non matching local properties do not matter
-			{
-				GlobalProperties gp = new GlobalProperties();
-				LocalProperties lp = LocalProperties.forOrdering(new Ordering(3, null, Order.ASCENDING).appendOrdering(1, null, Order.DESCENDING));
-				
-				RequestedLocalProperties reqLp = new RequestedLocalProperties();
-				reqLp.setGroupedFields(new FieldList(4, 1));
-				
-				toMap1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
-				toMap1.setLocalStrategy(LocalStrategy.SORT, new FieldList(5, 7), new boolean[] {false, false});
-				
-				toMap2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
-				toMap2.setLocalStrategy(LocalStrategy.NONE);
-				
-				toMap3.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
-				toMap3.setLocalStrategy(LocalStrategy.NONE);
-				
-				toMap1.setRequiredGlobalProps(null);
-				toMap1.setRequiredLocalProps(null);
-				
-				toMap2.setRequiredGlobalProps(null);
-				toMap2.setRequiredLocalProps(null);
-				
-				toMap3.setRequiredGlobalProps(null);
-				toMap3.setRequiredLocalProps(reqLp);
-				
-				FeedbackPropertiesMeetRequirementsReport report = map3.checkPartialSolutionPropertiesMet(target, gp, lp);
-				assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
-			}
-			
-			// set global strategy in first channel, so later non matching global properties do not matter
-			{
-				GlobalProperties gp = new GlobalProperties();
-				gp.setHashPartitioned(new FieldList(5, 3));
-				LocalProperties lp = LocalProperties.EMPTY;
-				
-				RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
-				reqGp.setAnyPartitioning(new FieldSet(2, 3));
-				
-				toMap1.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(1, 2), DataExchangeMode.PIPELINED);
-				toMap1.setLocalStrategy(LocalStrategy.NONE);
-				
-				toMap2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
-				toMap2.setLocalStrategy(LocalStrategy.NONE);
-				
-				toMap3.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
-				toMap3.setLocalStrategy(LocalStrategy.NONE);
-				
-				toMap1.setRequiredGlobalProps(null);
-				toMap1.setRequiredLocalProps(null);
-				
-				toMap2.setRequiredGlobalProps(null);
-				toMap2.setRequiredLocalProps(null);
-				
-				toMap3.setRequiredGlobalProps(reqGp);
-				toMap3.setRequiredLocalProps(null);
-				
-				FeedbackPropertiesMeetRequirementsReport report = map3.checkPartialSolutionPropertiesMet(target, gp, lp);
-				assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testNoPartialSolutionFoundTwoInputOperator() {
-		try {
-			SourcePlanNode target = new SourcePlanNode(getSourceNode(), "Partial Solution");
-
-			SourcePlanNode source1 = new SourcePlanNode(getSourceNode(), "Source 1");
-			SourcePlanNode source2 = new SourcePlanNode(getSourceNode(), "Source 2");
-			
-			Channel toMap1 = new Channel(source1);
-			toMap1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
-			toMap1.setLocalStrategy(LocalStrategy.NONE);
-			SingleInputPlanNode map1 = new SingleInputPlanNode(getMapNode(), "Mapper 1", toMap1, DriverStrategy.MAP);
-			
-			Channel toMap2 = new Channel(source2);
-			toMap2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
-			toMap2.setLocalStrategy(LocalStrategy.NONE);
-			SingleInputPlanNode map2 = new SingleInputPlanNode(getMapNode(), "Mapper 2", toMap2, DriverStrategy.MAP);
-			
-			Channel toJoin1 = new Channel(map1);
-			Channel toJoin2 = new Channel(map2);
-			
-			toJoin1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
-			toJoin1.setLocalStrategy(LocalStrategy.NONE);
-			toJoin2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
-			toJoin2.setLocalStrategy(LocalStrategy.NONE);
-			
-			DualInputPlanNode join = new DualInputPlanNode(getJoinNode(), "Join", toJoin1, toJoin2, DriverStrategy.HYBRIDHASH_BUILD_FIRST);
-			
-			FeedbackPropertiesMeetRequirementsReport report = join.checkPartialSolutionPropertiesMet(target, new GlobalProperties(), new LocalProperties());
-			assertEquals(NO_PARTIAL_SOLUTION, report);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testTwoOperatorsOneIndependent() {
-		try {
-			SourcePlanNode target = new SourcePlanNode(getSourceNode(), "Partial Solution");
-			SourcePlanNode source = new SourcePlanNode(getSourceNode(), "Other Source");
-			
-			Channel toMap1 = new Channel(target);
-			toMap1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
-			toMap1.setLocalStrategy(LocalStrategy.NONE);
-			SingleInputPlanNode map1 = new SingleInputPlanNode(getMapNode(), "Mapper 1", toMap1, DriverStrategy.MAP);
-			
-			Channel toMap2 = new Channel(source);
-			toMap2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
-			toMap2.setLocalStrategy(LocalStrategy.NONE);
-			SingleInputPlanNode map2 = new SingleInputPlanNode(getMapNode(), "Mapper 2", toMap2, DriverStrategy.MAP);
-			
-			Channel toJoin1 = new Channel(map1);
-			Channel toJoin2 = new Channel(map2);
-			
-			DualInputPlanNode join = new DualInputPlanNode(getJoinNode(), "Join", toJoin1, toJoin2, DriverStrategy.HYBRIDHASH_BUILD_FIRST);
-			
-			Channel toAfterJoin = new Channel(join);
-			toAfterJoin.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
-			toAfterJoin.setLocalStrategy(LocalStrategy.NONE);
-			SingleInputPlanNode afterJoin = new SingleInputPlanNode(getMapNode(), "After Join Mapper", toAfterJoin, DriverStrategy.MAP);
-			
-			// attach some properties to the non-relevant input
-			{
-				toMap2.setShipStrategy(ShipStrategyType.BROADCAST, DataExchangeMode.PIPELINED);
-				toMap2.setLocalStrategy(LocalStrategy.SORT, new FieldList(2, 7), new boolean[] {true, true});
-				
-				RequestedGlobalProperties joinGp = new RequestedGlobalProperties();
-				joinGp.setFullyReplicated();
-				
-				RequestedLocalProperties joinLp = new RequestedLocalProperties();
-				joinLp.setOrdering(new Ordering(2, null, Order.ASCENDING).appendOrdering(7, null, Order.ASCENDING));
-				
-				toJoin2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
-				toJoin2.setLocalStrategy(LocalStrategy.NONE);
-				toJoin2.setRequiredGlobalProps(joinGp);
-				toJoin2.setRequiredLocalProps(joinLp);
-			}
-			
-			// ------------------------------------------------------------------------------------
-			
-			// no properties from the partial solution, no required properties
-			{
-				toJoin1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
-				toJoin1.setLocalStrategy(LocalStrategy.NONE);
-				
-				GlobalProperties gp = new GlobalProperties();
-				LocalProperties lp = LocalProperties.EMPTY;
-				
-				FeedbackPropertiesMeetRequirementsReport report = join.checkPartialSolutionPropertiesMet(target, gp, lp);
-				assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
-			}
-			
-			// some properties from the partial solution, no required properties
-			{
-				toJoin1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
-				toJoin1.setLocalStrategy(LocalStrategy.NONE);
-				
-				GlobalProperties gp = new GlobalProperties();
-				gp.setHashPartitioned(new FieldList(0));
-				LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
-				
-				FeedbackPropertiesMeetRequirementsReport report = join.checkPartialSolutionPropertiesMet(target, gp, lp);
-				assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
-			}
-			
-			
-			// produced properties match relevant input
-			{
-				GlobalProperties gp = new GlobalProperties();
-				gp.setHashPartitioned(new FieldList(0));
-				LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
-				
-				RequestedGlobalProperties rgp = new RequestedGlobalProperties();
-				rgp.setHashPartitioned(new FieldList(0));
-				
-				RequestedLocalProperties rlp = new RequestedLocalProperties();
-				rlp.setGroupedFields(new FieldList(2));
-				
-				toJoin1.setRequiredGlobalProps(rgp);
-				toJoin1.setRequiredLocalProps(rlp);
-				
-				toJoin1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
-				toJoin1.setLocalStrategy(LocalStrategy.NONE);
-				
-				FeedbackPropertiesMeetRequirementsReport report = join.checkPartialSolutionPropertiesMet(target, gp, lp);
-				assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
-			}
-			
-			// produced properties do not match relevant input
-			{
-				GlobalProperties gp = new GlobalProperties();
-				gp.setHashPartitioned(new FieldList(0));
-				LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
-				
-				RequestedGlobalProperties rgp = new RequestedGlobalProperties();
-				rgp.setHashPartitioned(new FieldList(0));
-				
-				RequestedLocalProperties rlp = new RequestedLocalProperties();
-				rlp.setGroupedFields(new FieldList(1, 2, 3));
-				
-				toJoin1.setRequiredGlobalProps(rgp);
-				toJoin1.setRequiredLocalProps(rlp);
-				
-				toJoin1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
-				toJoin1.setLocalStrategy(LocalStrategy.NONE);
-				
-				FeedbackPropertiesMeetRequirementsReport report = join.checkPartialSolutionPropertiesMet(target, gp, lp);
-				assertEquals(NOT_MET, report);
-			}
-			
-			// produced properties overridden before join
-			{
-				GlobalProperties gp = new GlobalProperties();
-				gp.setHashPartitioned(new FieldList(0));
-				LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
-				
-				RequestedGlobalProperties rgp = new RequestedGlobalProperties();
-				rgp.setHashPartitioned(new FieldList(0));
-				
-				RequestedLocalProperties rlp = new RequestedLocalProperties();
-				rlp.setGroupedFields(new FieldList(2, 1));
-				
-				toMap1.setRequiredGlobalProps(rgp);
-				toMap1.setRequiredLocalProps(rlp);
-				
-				toJoin1.setRequiredGlobalProps(null);
-				toJoin1.setRequiredLocalProps(null);
-				
-				toJoin1.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(2, 1), DataExchangeMode.PIPELINED);
-				toJoin1.setLocalStrategy(LocalStrategy.SORT, new FieldList(7, 3), new boolean[] {true, false});
-				
-				FeedbackPropertiesMeetRequirementsReport report = join.checkPartialSolutionPropertiesMet(target, gp, lp);
-				assertEquals(MET, report);
-			}
-			
-			// produced properties before join match, after join match as well
-			{
-				GlobalProperties gp = new GlobalProperties();
-				gp.setHashPartitioned(new FieldList(0));
-				LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
-				
-				RequestedGlobalProperties rgp = new RequestedGlobalProperties();
-				rgp.setHashPartitioned(new FieldList(0));
-				
-				RequestedLocalProperties rlp = new RequestedLocalProperties();
-				rlp.setGroupedFields(new FieldList(2, 1));
-				
-				toMap1.setRequiredGlobalProps(null);
-				toMap1.setRequiredLocalProps(null);
-				
-				toJoin1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
-				toJoin1.setLocalStrategy(LocalStrategy.NONE);
-				
-				toJoin1.setRequiredGlobalProps(rgp);
-				toJoin1.setRequiredLocalProps(rlp);
-			
-				toAfterJoin.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
-				toAfterJoin.setLocalStrategy(LocalStrategy.NONE);
-				
-				toAfterJoin.setRequiredGlobalProps(rgp);
-				toAfterJoin.setRequiredLocalProps(rlp);
-				
-				FeedbackPropertiesMeetRequirementsReport report = join.checkPartialSolutionPropertiesMet(target, gp, lp);
-				assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
-			}
-			
-			// produced properties before join match, after join do not match
-			{
-				GlobalProperties gp = new GlobalProperties();
-				gp.setHashPartitioned(new FieldList(0));
-				LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
-				
-				RequestedGlobalProperties rgp1 = new RequestedGlobalProperties();
-				rgp1.setHashPartitioned(new FieldList(0));
-				
-				RequestedGlobalProperties rgp2 = new RequestedGlobalProperties();
-				rgp2.setHashPartitioned(new FieldList(3));
-				
-				RequestedLocalProperties rlp1 = new RequestedLocalProperties();
-				rlp1.setGroupedFields(new FieldList(2, 1));
-				
-				RequestedLocalProperties rlp2 = new RequestedLocalProperties();
-				rlp2.setGroupedFields(new FieldList(3, 4));
-				
-				toJoin1.setRequiredGlobalProps(rgp1);
-				toJoin1.setRequiredLocalProps(rlp1);
-			
-				toAfterJoin.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
-				toAfterJoin.setLocalStrategy(LocalStrategy.NONE);
-				
-				toAfterJoin.setRequiredGlobalProps(rgp2);
-				toAfterJoin.setRequiredLocalProps(rlp2);
-				
-				FeedbackPropertiesMeetRequirementsReport report = afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
-				assertEquals(NOT_MET, report);
-			}
-			
-			// produced properties are overridden, does not matter that they do not match
-			{
-				GlobalProperties gp = new GlobalProperties();
-				gp.setAnyPartitioning(new FieldList(0));
-				LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
-				
-				RequestedGlobalProperties rgp = new RequestedGlobalProperties();
-				rgp.setHashPartitioned(new FieldList(1));
-				
-				RequestedLocalProperties rlp = new RequestedLocalProperties();
-				rlp.setGroupedFields(new FieldList(1, 2, 3));
-				
-				toJoin1.setRequiredGlobalProps(null);
-				toJoin1.setRequiredLocalProps(null);
-				
-				toJoin1.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(2, 1), DataExchangeMode.PIPELINED);
-				toJoin1.setLocalStrategy(LocalStrategy.SORT, new FieldList(7, 3), new boolean[] {true, false});
-				
-				toAfterJoin.setRequiredGlobalProps(rgp);
-				toAfterJoin.setRequiredLocalProps(rlp);
-				
-				FeedbackPropertiesMeetRequirementsReport report = afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
-				assertEquals(MET, report);
-			}
-			
-			// local property overridden before join, local property mismatch after join not relevant
-			{
-				GlobalProperties gp = new GlobalProperties();
-				gp.setAnyPartitioning(new FieldList(0));
-				LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
-				
-				RequestedLocalProperties rlp = new RequestedLocalProperties();
-				rlp.setGroupedFields(new FieldList(1, 2, 3));
-				
-				toJoin1.setRequiredGlobalProps(null);
-				toJoin1.setRequiredLocalProps(null);
-				
-				toJoin1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
-				toJoin1.setLocalStrategy(LocalStrategy.SORT, new FieldList(7, 3), new boolean[] {true, false});
-				
-				toAfterJoin.setRequiredGlobalProps(null);
-				toAfterJoin.setRequiredLocalProps(rlp);
-				
-				FeedbackPropertiesMeetRequirementsReport report = afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
-				assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
-			}
-			
-			// local property overridden before join, global property mismatch after join void the match
-			{
-				GlobalProperties gp = new GlobalProperties();
-				gp.setAnyPartitioning(new FieldList(0));
-				LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
-				
-				RequestedGlobalProperties rgp = new RequestedGlobalProperties();
-				rgp.setHashPartitioned(new FieldList(1));
-				
-				RequestedLocalProperties rlp = new RequestedLocalProperties();
-				rlp.setGroupedFields(new FieldList(1, 2, 3));
-				
-				toJoin1.setRequiredGlobalProps(null);
-				toJoin1.setRequiredLocalProps(null);
-				
-				toJoin1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
-				toJoin1.setLocalStrategy(LocalStrategy.SORT, new FieldList(7, 3), new boolean[] {true, false});
-				
-				toAfterJoin.setRequiredGlobalProps(rgp);
-				toAfterJoin.setRequiredLocalProps(rlp);
-				
-				FeedbackPropertiesMeetRequirementsReport report = afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
-				assertEquals(NOT_MET, report);
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testTwoOperatorsBothDependent() {
-		try {
-			SourcePlanNode target = new SourcePlanNode(getSourceNode(), "Partial Solution");
-			
-			Channel toMap1 = new Channel(target);
-			toMap1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
-			toMap1.setLocalStrategy(LocalStrategy.NONE);
-			SingleInputPlanNode map1 = new SingleInputPlanNode(getMapNode(), "Mapper 1", toMap1, DriverStrategy.MAP);
-			
-			Channel toMap2 = new Channel(target);
-			toMap2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
-			toMap2.setLocalStrategy(LocalStrategy.NONE);
-			SingleInputPlanNode map2 = new SingleInputPlanNode(getMapNode(), "Mapper 2", toMap2, DriverStrategy.MAP);
-			
-			Channel toJoin1 = new Channel(map1);
-			toJoin1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
-			toJoin1.setLocalStrategy(LocalStrategy.NONE);
-			
-			Channel toJoin2 = new Channel(map2);
-			toJoin2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
-			toJoin2.setLocalStrategy(LocalStrategy.NONE);
-			
-			DualInputPlanNode join = new DualInputPlanNode(getJoinNode(), "Join", toJoin1, toJoin2, DriverStrategy.HYBRIDHASH_BUILD_FIRST);
-			
-			Channel toAfterJoin = new Channel(join);
-			toAfterJoin.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
-			toAfterJoin.setLocalStrategy(LocalStrategy.NONE);
-			SingleInputPlanNode afterJoin = new SingleInputPlanNode(getMapNode(), "After Join Mapper", toAfterJoin, DriverStrategy.MAP);
-			
-			// no properties from the partial solution, no required properties
-			{
-				GlobalProperties gp = new GlobalProperties();
-				LocalProperties lp = LocalProperties.EMPTY;
-				
-				FeedbackPropertiesMeetRequirementsReport report = afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
-				assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
-			}
-			
-			// some properties from the partial solution, no required properties
-			{
-				GlobalProperties gp = new GlobalProperties();
-				gp.setHashPartitioned(new FieldList(0));
-				LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
-				
-				FeedbackPropertiesMeetRequirementsReport report = afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
-				assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
-			}
-			
-			// test requirements on one input and met
-			{
-				GlobalProperties gp = new GlobalProperties();
-				gp.setHashPartitioned(new FieldList(0));
-				LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
-				
-				RequestedGlobalProperties rgp = new RequestedGlobalProperties();
-				rgp.setHashPartitioned(new FieldList(0));
-				
-				RequestedLocalProperties rlp = new RequestedLocalProperties();
-				rlp.setGroupedFields(new FieldList(2, 1));
-				
-				toJoin1.setRequiredGlobalProps(rgp);
-				toJoin1.setRequiredLocalProps(rlp);
-				
-				FeedbackPropertiesMeetRequirementsReport report = afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
-				assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
-			}
-			
-			// test requirements on both input and met
-			{
-				GlobalProperties gp = new GlobalProperties();
-				gp.setHashPartitioned(new FieldList(0));
-				LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
-				
-				RequestedGlobalProperties rgp = new RequestedGlobalProperties();
-				rgp.setHashPartitioned(new FieldList(0));
-				
-				RequestedLocalProperties rlp = new RequestedLocalProperties();
-				rlp.setGroupedFields(new FieldList(2, 1));
-				
-				toJoin1.setRequiredGlobalProps(rgp);
-				toJoin1.setRequiredLocalProps(rlp);
-				
-				toJoin2.setRequiredGlobalProps(rgp);
-				toJoin2.setRequiredLocalProps(rlp);
-				
-				FeedbackPropertiesMeetRequirementsReport report = afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
-				assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
-			}
-			
-			// test requirements on both inputs, one not met
-			{
-				GlobalProperties gp = new GlobalProperties();
-				gp.setHashPartitioned(new FieldList(0));
-				LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
-				
-				RequestedGlobalProperties rgp1 = new RequestedGlobalProperties();
-				rgp1.setHashPartitioned(new FieldList(0));
-				
-				RequestedLocalProperties rlp1 = new RequestedLocalProperties();
-				rlp1.setGroupedFields(new FieldList(2, 1));
-				
-				RequestedGlobalProperties rgp2 = new RequestedGlobalProperties();
-				rgp2.setHashPartitioned(new FieldList(1));
-				
-				RequestedLocalProperties rlp2 = new RequestedLocalProperties();
-				rlp2.setGroupedFields(new FieldList(0, 3));
-				
-				toJoin1.setRequiredGlobalProps(rgp1);
-				toJoin1.setRequiredLocalProps(rlp1);
-				
-				toJoin2.setRequiredGlobalProps(rgp2);
-				toJoin2.setRequiredLocalProps(rlp2);
-				
-				FeedbackPropertiesMeetRequirementsReport report = afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
-				assertEquals(NOT_MET, report);
-			}
-			
-			// test override on both inputs, later requirement ignored
-			{
-				GlobalProperties gp = new GlobalProperties();
-				gp.setHashPartitioned(new FieldList(0));
-				LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
-				
-				RequestedGlobalProperties rgp = new RequestedGlobalProperties();
-				rgp.setHashPartitioned(new FieldList(1));
-				
-				RequestedLocalProperties rlp = new RequestedLocalProperties();
-				rlp.setGroupedFields(new FieldList(0, 3));
-				
-				toJoin1.setRequiredGlobalProps(null);
-				toJoin1.setRequiredLocalProps(null);
-				
-				toJoin2.setRequiredGlobalProps(null);
-				toJoin2.setRequiredLocalProps(null);
-				
-				toJoin1.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(88), DataExchangeMode.PIPELINED);
-				toJoin2.setShipStrategy(ShipStrategyType.BROADCAST, DataExchangeMode.PIPELINED);
-				
-				toAfterJoin.setRequiredGlobalProps(rgp);
-				toAfterJoin.setRequiredLocalProps(rlp);
-				
-				FeedbackPropertiesMeetRequirementsReport report = afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
-				assertEquals(MET, report);
-			}
-			
-			// test override on one inputs, later requirement met
-			{
-				GlobalProperties gp = new GlobalProperties();
-				gp.setHashPartitioned(new FieldList(0));
-				LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
-				
-				RequestedGlobalProperties rgp = new RequestedGlobalProperties();
-				rgp.setHashPartitioned(new FieldList(0));
-				
-				RequestedLocalProperties rlp = new RequestedLocalProperties();
-				rlp.setGroupedFields(new FieldList(2, 1));
-				
-				toJoin1.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(88), DataExchangeMode.PIPELINED);
-				toJoin2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
-				
-				toAfterJoin.setRequiredGlobalProps(rgp);
-				toAfterJoin.setRequiredLocalProps(rlp);
-				
-				FeedbackPropertiesMeetRequirementsReport report = afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
-				assertEquals(PENDING, report);
-			}
-			
-			// test override on one input, later requirement not met
-			{
-				GlobalProperties gp = new GlobalProperties();
-				gp.setHashPartitioned(new FieldList(0));
-				LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
-				
-				RequestedGlobalProperties rgp = new RequestedGlobalProperties();
-				rgp.setHashPartitioned(new FieldList(3));
-				
-				RequestedLocalProperties rlp = new RequestedLocalProperties();
-				rlp.setGroupedFields(new FieldList(77, 69));
-				
-				toJoin1.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(88), DataExchangeMode.PIPELINED);
-				toJoin2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
-				
-				toAfterJoin.setRequiredGlobalProps(rgp);
-				toAfterJoin.setRequiredLocalProps(rlp);
-				
-				FeedbackPropertiesMeetRequirementsReport report = afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
-				assertEquals(NOT_MET, report);
-			}
-			
-			// test override on one input locally, later global requirement not met
-			{
-				GlobalProperties gp = new GlobalProperties();
-				gp.setHashPartitioned(new FieldList(0));
-				LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
-				
-				RequestedGlobalProperties rgp = new RequestedGlobalProperties();
-				rgp.setHashPartitioned(new FieldList(3));
-				
-				
-				toJoin1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
-				toJoin1.setLocalStrategy(LocalStrategy.SORT, new FieldList(3), new boolean[] { false });
-				
-				toJoin2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
-				toJoin1.setLocalStrategy(LocalStrategy.NONE);
-				
-				toAfterJoin.setRequiredGlobalProps(rgp);
-				toAfterJoin.setRequiredLocalProps(null);
-				
-				FeedbackPropertiesMeetRequirementsReport report = afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
-				assertEquals(NOT_MET, report);
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	private static DataSourceNode getSourceNode() {
-		return new DataSourceNode(new GenericDataSourceBase<String, TextInputFormat>(new TextInputFormat(new Path("/")), new OperatorInformation<String>(BasicTypeInfo.STRING_TYPE_INFO)));
-	}
-	
-	private static MapNode getMapNode() {
-		return new MapNode(new MapOperatorBase<String, String, MapFunction<String,String>>(new IdentityMapper<String>(), new UnaryOperatorInformation<String, String>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), "map op"));
-	}
-	
-	private static JoinNode getJoinNode() {
-		return new JoinNode(new JoinOperatorBase<String, String, String, FlatJoinFunction<String, String, String>>(new DummyFlatJoinFunction<String>(), new BinaryOperatorInformation<String, String, String>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), new int[] {1}, new int[] {2}, "join op"));
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/GroupOrderTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/GroupOrderTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/GroupOrderTest.java
deleted file mode 100644
index 77d185d..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/GroupOrderTest.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer;
-
-import static org.junit.Assert.fail;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.common.operators.Ordering;
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.java.record.operators.CoGroupOperator;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.DualInputPlanNode;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.util.DummyCoGroupStub;
-import org.apache.flink.optimizer.util.DummyInputFormat;
-import org.apache.flink.optimizer.util.DummyOutputFormat;
-import org.apache.flink.optimizer.util.IdentityReduce;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.StringValue;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * This test case has been created to validate that correct strategies are used if orders within groups are
- * requested.
- */
-@SuppressWarnings({"serial", "deprecation"})
-public class GroupOrderTest extends CompilerTestBase {
-
-	@Test
-	public void testReduceWithGroupOrder() {
-		// construct the plan
-		FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source");
-		
-		ReduceOperator reduce = ReduceOperator.builder(new IdentityReduce()).keyField(IntValue.class, 2).name("Reduce").input(source).build();
-		Ordering groupOrder = new Ordering(5, StringValue.class, Order.DESCENDING);
-		reduce.setGroupOrder(groupOrder);
-		
-		FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, reduce, "Sink");
-		
-		
-		Plan plan = new Plan(sink, "Test Temp Task");
-		plan.setDefaultParallelism(DEFAULT_PARALLELISM);
-		
-		OptimizedPlan oPlan;
-		try {
-			oPlan = compileNoStats(plan);
-		} catch(CompilerException ce) {
-			ce.printStackTrace();
-			fail("The pact compiler is unable to compile this plan correctly.");
-			return; // silence the compiler
-		}
-		
-		OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(oPlan);
-		SinkPlanNode sinkNode = resolver.getNode("Sink");
-		SingleInputPlanNode reducer = resolver.getNode("Reduce");
-		
-		// verify the strategies
-		Assert.assertEquals(ShipStrategyType.FORWARD, sinkNode.getInput().getShipStrategy());
-		Assert.assertEquals(ShipStrategyType.PARTITION_HASH, reducer.getInput().getShipStrategy());
-		
-		Channel c = reducer.getInput();
-		Assert.assertEquals(LocalStrategy.SORT, c.getLocalStrategy());
-		
-		FieldList ship = new FieldList(2);
-		FieldList local = new FieldList(2, 5);
-		Assert.assertEquals(ship, c.getShipStrategyKeys());
-		Assert.assertEquals(local, c.getLocalStrategyKeys());
-		Assert.assertTrue(c.getLocalStrategySortOrder()[0] == reducer.getSortOrders(0)[0]);
-		
-		// check that we indeed sort descending
-		Assert.assertTrue(c.getLocalStrategySortOrder()[1] == groupOrder.getFieldSortDirections()[0]);
-	}
-	
-	@Test
-	public void testCoGroupWithGroupOrder() {
-		// construct the plan
-		FileDataSource source1 = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source1");
-		FileDataSource source2 = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source2");
-		
-		CoGroupOperator coGroup = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 3, 6)
-				.keyField(LongValue.class, 0, 0)
-				.name("CoGroup").input1(source1).input2(source2).build();
-		
-		Ordering groupOrder1 = new Ordering(5, StringValue.class, Order.DESCENDING);
-		Ordering groupOrder2 = new Ordering(1, StringValue.class, Order.DESCENDING);
-		groupOrder2.appendOrdering(4, DoubleValue.class, Order.ASCENDING);
-		coGroup.setGroupOrderForInputOne(groupOrder1);
-		coGroup.setGroupOrderForInputTwo(groupOrder2);
-		
-		FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, coGroup, "Sink");
-		
-		Plan plan = new Plan(sink, "Reduce Group Order Test");
-		plan.setDefaultParallelism(DEFAULT_PARALLELISM);
-		
-		OptimizedPlan oPlan;
-		try {
-			oPlan = compileNoStats(plan);
-		} catch(CompilerException ce) {
-			ce.printStackTrace();
-			fail("The pact compiler is unable to compile this plan correctly.");
-			return; // silence the compiler
-		}
-		
-		OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(oPlan);
-		SinkPlanNode sinkNode = resolver.getNode("Sink");
-		DualInputPlanNode coGroupNode = resolver.getNode("CoGroup");
-		
-		// verify the strategies
-		Assert.assertEquals(ShipStrategyType.FORWARD, sinkNode.getInput().getShipStrategy());
-		Assert.assertEquals(ShipStrategyType.PARTITION_HASH, coGroupNode.getInput1().getShipStrategy());
-		Assert.assertEquals(ShipStrategyType.PARTITION_HASH, coGroupNode.getInput2().getShipStrategy());
-		
-		Channel c1 = coGroupNode.getInput1();
-		Channel c2 = coGroupNode.getInput2();
-		
-		Assert.assertEquals(LocalStrategy.SORT, c1.getLocalStrategy());
-		Assert.assertEquals(LocalStrategy.SORT, c2.getLocalStrategy());
-		
-		FieldList ship1 = new FieldList(new int[] {3, 0});
-		FieldList ship2 = new FieldList(new int[] {6, 0});
-		
-		FieldList local1 = new FieldList(new int[] {3, 0, 5});
-		FieldList local2 = new FieldList(new int[] {6, 0, 1, 4});
-		
-		Assert.assertEquals(ship1, c1.getShipStrategyKeys());
-		Assert.assertEquals(ship2, c2.getShipStrategyKeys());
-		Assert.assertEquals(local1, c1.getLocalStrategyKeys());
-		Assert.assertEquals(local2, c2.getLocalStrategyKeys());
-		
-		Assert.assertTrue(c1.getLocalStrategySortOrder()[0] == coGroupNode.getSortOrders()[0]);
-		Assert.assertTrue(c1.getLocalStrategySortOrder()[1] == coGroupNode.getSortOrders()[1]);
-		Assert.assertTrue(c2.getLocalStrategySortOrder()[0] == coGroupNode.getSortOrders()[0]);
-		Assert.assertTrue(c2.getLocalStrategySortOrder()[1] == coGroupNode.getSortOrders()[1]);
-		
-		// check that the local group orderings are correct
-		Assert.assertTrue(c1.getLocalStrategySortOrder()[2] == groupOrder1.getFieldSortDirections()[0]);
-		Assert.assertTrue(c2.getLocalStrategySortOrder()[2] == groupOrder2.getFieldSortDirections()[0]);
-		Assert.assertTrue(c2.getLocalStrategySortOrder()[3] == groupOrder2.getFieldSortDirections()[1]);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/HardPlansCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/HardPlansCompilationTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/HardPlansCompilationTest.java
deleted file mode 100644
index 6dadc19..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/HardPlansCompilationTest.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.record.operators.CrossOperator;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.optimizer.util.DummyCrossStub;
-import org.apache.flink.optimizer.util.DummyInputFormat;
-import org.apache.flink.optimizer.util.DummyOutputFormat;
-import org.apache.flink.optimizer.util.IdentityMap;
-import org.apache.flink.optimizer.util.IdentityReduce;
-import org.apache.flink.types.IntValue;
-import org.junit.Test;
-
-/**
- * This class tests plans that once failed because of a bug:
- * <ul>
- *   <li> Ticket 158
- * </ul>
- */
-@SuppressWarnings({"serial", "deprecation"})
-public class HardPlansCompilationTest extends CompilerTestBase {
-	
-	/**
-	 * Source -> Map -> Reduce -> Cross -> Reduce -> Cross -> Reduce ->
-	 * |--------------------------/                  /
-	 * |--------------------------------------------/
-	 * 
-	 * First cross has SameKeyFirst output contract
-	 */
-	@Test
-	public void testTicket158() {
-		// construct the plan
-		FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source");
-		
-		MapOperator map = MapOperator.builder(new IdentityMap()).name("Map1").input(source).build();
-		
-		ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce1").input(map).build();
-		
-		CrossOperator cross1 = CrossOperator.builder(new DummyCrossStub()).name("Cross1").input1(reduce1).input2(source).build();
-		
-		ReduceOperator reduce2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce2").input(cross1).build();
-		
-		CrossOperator cross2 = CrossOperator.builder(new DummyCrossStub()).name("Cross2").input1(reduce2).input2(source).build();
-		
-		ReduceOperator reduce3 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce3").input(cross2).build();
-		
-		FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink");
-		sink.setInput(reduce3);
-		
-		Plan plan = new Plan(sink, "Test Temp Task");
-		plan.setDefaultParallelism(DEFAULT_PARALLELISM);
-		
-		OptimizedPlan oPlan = compileNoStats(plan);
-		JobGraphGenerator jobGen = new JobGraphGenerator();
-		jobGen.compileJobGraph(oPlan);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java
deleted file mode 100644
index ac4f820..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java
+++ /dev/null
@@ -1,409 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.junit.Test;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.operators.DeltaIteration;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.operators.IterativeDataSet;
-import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.common.functions.RichGroupReduceFunction;
-import org.apache.flink.api.common.functions.RichJoinFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
-import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
-import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
-import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.optimizer.testfunctions.IdentityKeyExtractor;
-import org.apache.flink.optimizer.testfunctions.IdentityMapper;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.util.Collector;
-
-
-@SuppressWarnings({"serial", "unchecked"})
-public class IterationsCompilerTest extends CompilerTestBase {
-
-	@Test
-	public void testSolutionSetDeltaDependsOnBroadcastVariable() {
-		try {
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			
-			DataSet<Tuple2<Long, Long>> source =
-						env.generateSequence(1, 1000).map(new DuplicateValueScalar<Long>());
-			
-			DataSet<Tuple2<Long, Long>> invariantInput =
-					env.generateSequence(1, 1000).map(new DuplicateValueScalar<Long>());
-			
-			// iteration from here
-			DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iter = source.iterateDelta(source, 1000, 1);
-			
-			DataSet<Tuple2<Long, Long>> result =
-				invariantInput
-					.map(new IdentityMapper<Tuple2<Long, Long>>()).withBroadcastSet(iter.getWorkset(), "bc data")
-					.join(iter.getSolutionSet()).where(0).equalTo(1).projectFirst(1).projectSecond(1);
-			
-			iter.closeWith(result.map(new IdentityMapper<Tuple2<Long,Long>>()), result).print();
-			
-			OptimizedPlan p = compileNoStats(env.createProgramPlan());
-			
-			// check that the JSON generator accepts this plan
-			new PlanJSONDumpGenerator().getOptimizerPlanAsJSON(p);
-			
-			// check that the JobGraphGenerator accepts the plan
-			new JobGraphGenerator().compileJobGraph(p);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testTwoIterationsWithMapperInbetween() throws Exception {
-		try {
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(8);
-			
-			DataSet<Tuple2<Long, Long>> verticesWithInitialId = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
-			
-			DataSet<Tuple2<Long, Long>> edges = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
-			
-			DataSet<Tuple2<Long, Long>> bulkResult = doBulkIteration(verticesWithInitialId, edges);
-			
-			DataSet<Tuple2<Long, Long>> mappedBulk = bulkResult.map(new DummyMap());
-			
-			DataSet<Tuple2<Long, Long>> depResult = doDeltaIteration(mappedBulk, edges);
-			
-			depResult.print();
-			
-			Plan p = env.createProgramPlan();
-			OptimizedPlan op = compileNoStats(p);
-			
-			assertEquals(1, op.getDataSinks().size());
-			assertTrue(op.getDataSinks().iterator().next().getInput().getSource() instanceof WorksetIterationPlanNode);
-			
-			WorksetIterationPlanNode wipn = (WorksetIterationPlanNode) op.getDataSinks().iterator().next().getInput().getSource();
-			
-			assertEquals(ShipStrategyType.PARTITION_HASH, wipn.getInput1().getShipStrategy());
-			assertTrue(wipn.getInput2().getTempMode().breaksPipeline());
-			
-			new JobGraphGenerator().compileJobGraph(op);
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testTwoIterationsDirectlyChained() throws Exception {
-		try {
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(8);
-			
-			DataSet<Tuple2<Long, Long>> verticesWithInitialId = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
-			
-			DataSet<Tuple2<Long, Long>> edges = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
-			
-			DataSet<Tuple2<Long, Long>> bulkResult = doBulkIteration(verticesWithInitialId, edges);
-			
-			DataSet<Tuple2<Long, Long>> depResult = doDeltaIteration(bulkResult, edges);
-			
-			depResult.print();
-			
-			Plan p = env.createProgramPlan();
-			OptimizedPlan op = compileNoStats(p);
-			
-			assertEquals(1, op.getDataSinks().size());
-			assertTrue(op.getDataSinks().iterator().next().getInput().getSource() instanceof WorksetIterationPlanNode);
-			
-			WorksetIterationPlanNode wipn = (WorksetIterationPlanNode) op.getDataSinks().iterator().next().getInput().getSource();
-			
-			assertEquals(ShipStrategyType.PARTITION_HASH, wipn.getInput1().getShipStrategy());
-			assertTrue(wipn.getInput2().getTempMode().breaksPipeline());
-			
-			new JobGraphGenerator().compileJobGraph(op);
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testTwoWorksetIterationsDirectlyChained() throws Exception {
-		try {
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(8);
-			
-			DataSet<Tuple2<Long, Long>> verticesWithInitialId = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
-			
-			DataSet<Tuple2<Long, Long>> edges = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
-			
-			DataSet<Tuple2<Long, Long>> firstResult = doDeltaIteration(verticesWithInitialId, edges);
-			
-			DataSet<Tuple2<Long, Long>> secondResult = doDeltaIteration(firstResult, edges);
-			
-			secondResult.print();
-			
-			Plan p = env.createProgramPlan();
-			OptimizedPlan op = compileNoStats(p);
-			
-			assertEquals(1, op.getDataSinks().size());
-			assertTrue(op.getDataSinks().iterator().next().getInput().getSource() instanceof WorksetIterationPlanNode);
-			
-			WorksetIterationPlanNode wipn = (WorksetIterationPlanNode) op.getDataSinks().iterator().next().getInput().getSource();
-			
-			assertEquals(ShipStrategyType.FORWARD, wipn.getInput1().getShipStrategy());
-			assertTrue(wipn.getInput2().getTempMode().breaksPipeline());
-			
-			new JobGraphGenerator().compileJobGraph(op);
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testIterationPushingWorkOut() throws Exception {
-		try {
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(8);
-			
-			DataSet<Tuple2<Long, Long>> input1 = env.readCsvFile("/some/file/path").types(Long.class).map(new DuplicateValue());
-			
-			DataSet<Tuple2<Long, Long>> input2 = env.readCsvFile("/some/file/path").types(Long.class, Long.class);
-			
-			doBulkIteration(input1, input2).print();
-			
-			Plan p = env.createProgramPlan();
-			OptimizedPlan op = compileNoStats(p);
-			
-			assertEquals(1, op.getDataSinks().size());
-			assertTrue(op.getDataSinks().iterator().next().getInput().getSource() instanceof BulkIterationPlanNode);
-			
-			BulkIterationPlanNode bipn = (BulkIterationPlanNode) op.getDataSinks().iterator().next().getInput().getSource();
-			
-			// check that work has not! been pushed out, as the end of the step function does not produce the necessary properties
-			for (Channel c : bipn.getPartialSolutionPlanNode().getOutgoingChannels()) {
-				assertEquals(ShipStrategyType.PARTITION_HASH, c.getShipStrategy());
-			}
-			
-			new JobGraphGenerator().compileJobGraph(op);
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testWorksetIterationPipelineBreakerPlacement() {
-		try {
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(8);
-			
-			// the workset (input two of the delta iteration) is the same as what is consumed be the successive join
-			DataSet<Tuple2<Long, Long>> initialWorkset = env.readCsvFile("/some/file/path").types(Long.class).map(new DuplicateValue());
-			
-			DataSet<Tuple2<Long, Long>> initialSolutionSet = env.readCsvFile("/some/file/path").types(Long.class).map(new DuplicateValue());
-			
-			// trivial iteration, since we are interested in the inputs to the iteration
-			DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration = initialSolutionSet.iterateDelta(initialWorkset, 100, 0);
-			
-			DataSet<Tuple2<Long, Long>> next = iteration.getWorkset().map(new IdentityMapper<Tuple2<Long,Long>>());
-			
-			DataSet<Tuple2<Long, Long>> result = iteration.closeWith(next, next);
-			
-			initialWorkset
-				.join(result, JoinHint.REPARTITION_HASH_FIRST)
-				.where(0).equalTo(0)
-				.print();
-			
-			Plan p = env.createProgramPlan();
-			compileNoStats(p);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testResetPartialSolution() {
-		try {
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			
-			DataSet<Long> width = env.generateSequence(1, 10);
-			DataSet<Long> update = env.generateSequence(1, 10);
-			DataSet<Long> lastGradient = env.generateSequence(1, 10);
-			
-			DataSet<Long> init = width.union(update).union(lastGradient);
-			
-			IterativeDataSet<Long> iteration = init.iterate(10);
-			
-			width = iteration.filter(new IdFilter<Long>());
-			update = iteration.filter(new IdFilter<Long>());
-			lastGradient = iteration.filter(new IdFilter<Long>());
-			
-			DataSet<Long> gradient = width.map(new IdentityMapper<Long>());
-			DataSet<Long> term = gradient.join(lastGradient)
-								.where(new IdentityKeyExtractor<Long>())
-								.equalTo(new IdentityKeyExtractor<Long>())
-								.with(new JoinFunction<Long, Long, Long>() {
-									public Long join(Long first, Long second) { return null; }
-								});
-			
-			update = update.map(new RichMapFunction<Long, Long>() {
-				public Long map(Long value) { return null; }
-			}).withBroadcastSet(term, "some-name");
-			
-			DataSet<Long> result = iteration.closeWith(width.union(update).union(lastGradient));
-			
-			result.print();
-			
-			Plan p = env.createProgramPlan();
-			OptimizedPlan op = compileNoStats(p);
-			
-			new JobGraphGenerator().compileJobGraph(op);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	public static DataSet<Tuple2<Long, Long>> doBulkIteration(DataSet<Tuple2<Long, Long>> vertices, DataSet<Tuple2<Long, Long>> edges) {
-		
-		// open a bulk iteration
-		IterativeDataSet<Tuple2<Long, Long>> iteration = vertices.iterate(20);
-		
-		DataSet<Tuple2<Long, Long>> changes = iteration
-				.join(edges).where(0).equalTo(0).with(new Join222())
-				.groupBy(0).aggregate(Aggregations.MIN, 1)
-				.join(iteration).where(0).equalTo(0)
-				.flatMap(new FlatMapJoin());
-		
-		// close the bulk iteration
-		return iteration.closeWith(changes);
-	}
-				
-		
-	public static DataSet<Tuple2<Long, Long>> doDeltaIteration(DataSet<Tuple2<Long, Long>> vertices, DataSet<Tuple2<Long, Long>> edges) {
-
-		DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> depIteration = vertices.iterateDelta(vertices, 100, 0);
-				
-		DataSet<Tuple1<Long>> candidates = depIteration.getWorkset().join(edges).where(0).equalTo(0)
-				.projectSecond(1);
-		
-		DataSet<Tuple1<Long>> grouped = candidates.groupBy(0).reduceGroup(new Reduce101());
-		
-		DataSet<Tuple2<Long, Long>> candidatesDependencies = 
-				grouped.join(edges).where(0).equalTo(1).projectSecond(0, 1);
-		
-		DataSet<Tuple2<Long, Long>> verticesWithNewComponents = 
-				candidatesDependencies.join(depIteration.getSolutionSet()).where(0).equalTo(0)
-				.with(new Join222())
-				.groupBy(0).aggregate(Aggregations.MIN, 1);
-		
-		DataSet<Tuple2<Long, Long>> updatedComponentId = 
-				verticesWithNewComponents.join(depIteration.getSolutionSet()).where(0).equalTo(0)
-				.flatMap(new FlatMapJoin());
-		
-		DataSet<Tuple2<Long, Long>> depResult = depIteration.closeWith(updatedComponentId, updatedComponentId);
-		
-		return depResult;
-		
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	public static final class Join222 extends RichJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
-
-		@Override
-		public Tuple2<Long, Long> join(Tuple2<Long, Long> vertexWithComponent, Tuple2<Long, Long> edge) {
-			return null;
-		}
-	}
-	
-	public static final class FlatMapJoin extends RichFlatMapFunction<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>, Tuple2<Long, Long>> {
-		
-		@Override
-		public void flatMap(Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> value, Collector<Tuple2<Long, Long>> out) {}
-	}
-	
-	public static final class DummyMap extends RichMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
-
-		@Override
-		public Tuple2<Long, Long> map(Tuple2<Long, Long> value) throws Exception {
-			return value;
-		}
-	}
-	
-	@ForwardedFields("0")
-	public static final class Reduce101 extends RichGroupReduceFunction<Tuple1<Long>, Tuple1<Long>> {
-		
-		@Override
-		public void reduce(Iterable<Tuple1<Long>> values, Collector<Tuple1<Long>> out) {}
-	}
-	
-	@ForwardedFields("0")
-	public static final class DuplicateValue extends RichMapFunction<Tuple1<Long>, Tuple2<Long, Long>> {
-
-		@Override
-		public Tuple2<Long, Long> map(Tuple1<Long> value) throws Exception {
-			return new Tuple2<Long, Long>(value.f0, value.f0);
-		}
-	}
-	
-	public static final class DuplicateValueScalar<T> extends RichMapFunction<T, Tuple2<T, T>> {
-
-		@Override
-		public Tuple2<T, T> map(T value) {
-			return new Tuple2<T, T>(value, value);
-		}
-	}
-	
-	public static final class IdFilter<T> implements FilterFunction<T> {
-		@Override
-		public boolean filter(T value) {
-			return true;
-		}
-	}
-}