You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/06/17 02:09:11 UTC

[1/6] [FLINK-935] (continued) Correct feedback property check for iterative algorithms.

Repository: incubator-flink
Updated Branches:
  refs/heads/master 708426ba6 -> ca2b287a7


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca2b287a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/FeedbackPropertiesMatchTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/FeedbackPropertiesMatchTest.java b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/FeedbackPropertiesMatchTest.java
new file mode 100644
index 0000000..cdbea5b
--- /dev/null
+++ b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/FeedbackPropertiesMatchTest.java
@@ -0,0 +1,1432 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.pact.compiler;
+
+import static org.junit.Assert.*;
+import static eu.stratosphere.compiler.plan.PlanNode.FeedbackPropertiesMeetRequirementsReport.*;
+
+import org.junit.Test;
+
+import eu.stratosphere.api.common.functions.GenericJoiner;
+import eu.stratosphere.api.common.functions.GenericMap;
+import eu.stratosphere.api.common.operators.BinaryOperatorInformation;
+import eu.stratosphere.api.common.operators.OperatorInformation;
+import eu.stratosphere.api.common.operators.Order;
+import eu.stratosphere.api.common.operators.Ordering;
+import eu.stratosphere.api.common.operators.UnaryOperatorInformation;
+import eu.stratosphere.api.common.operators.base.GenericDataSourceBase;
+import eu.stratosphere.api.common.operators.base.JoinOperatorBase;
+import eu.stratosphere.api.common.operators.base.MapOperatorBase;
+import eu.stratosphere.api.common.operators.util.FieldList;
+import eu.stratosphere.api.common.operators.util.FieldSet;
+import eu.stratosphere.api.java.io.TextInputFormat;
+import eu.stratosphere.api.java.typeutils.BasicTypeInfo;
+import eu.stratosphere.compiler.dag.DataSourceNode;
+import eu.stratosphere.compiler.dag.MapNode;
+import eu.stratosphere.compiler.dag.MatchNode;
+import eu.stratosphere.compiler.dataproperties.GlobalProperties;
+import eu.stratosphere.compiler.dataproperties.LocalProperties;
+import eu.stratosphere.compiler.dataproperties.RequestedGlobalProperties;
+import eu.stratosphere.compiler.dataproperties.RequestedLocalProperties;
+import eu.stratosphere.compiler.plan.Channel;
+import eu.stratosphere.compiler.plan.DualInputPlanNode;
+import eu.stratosphere.compiler.plan.SingleInputPlanNode;
+import eu.stratosphere.compiler.plan.SourcePlanNode;
+import eu.stratosphere.compiler.plan.PlanNode.FeedbackPropertiesMeetRequirementsReport;
+import eu.stratosphere.core.fs.Path;
+import eu.stratosphere.pact.compiler.testfunctions.DummyJoinFunction;
+import eu.stratosphere.pact.compiler.testfunctions.IdentityMapper;
+import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
+import eu.stratosphere.pact.runtime.task.DriverStrategy;
+import eu.stratosphere.pact.runtime.task.util.LocalStrategy;
+
+
+public class FeedbackPropertiesMatchTest {
+
+	@Test
+	public void testNoPartialSolutionFoundSingleInputOnly() {
+		try {
+			SourcePlanNode target = new SourcePlanNode(getSourceNode(), "Source");
+			
+			SourcePlanNode otherTarget = new SourcePlanNode(getSourceNode(), "Source");
+			
+			Channel toMap1 = new Channel(target);
+			toMap1.setShipStrategy(ShipStrategyType.FORWARD);
+			toMap1.setLocalStrategy(LocalStrategy.NONE);
+			SingleInputPlanNode map1 = new SingleInputPlanNode(getMapNode(), "Mapper 1", toMap1, DriverStrategy.MAP);
+			
+			Channel toMap2 = new Channel(map1);
+			toMap2.setShipStrategy(ShipStrategyType.FORWARD);
+			toMap2.setLocalStrategy(LocalStrategy.NONE);
+			SingleInputPlanNode map2 = new SingleInputPlanNode(getMapNode(), "Mapper 2", toMap2, DriverStrategy.MAP);
+			
+			{
+				GlobalProperties gp = new GlobalProperties();
+				LocalProperties lp = new LocalProperties();
+				
+				FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(otherTarget, gp, lp);
+				assertTrue(report == NO_PARTIAL_SOLUTION);
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testSingleInputOperators() {
+		try {
+			SourcePlanNode target = new SourcePlanNode(getSourceNode(), "Source");
+			
+			Channel toMap1 = new Channel(target);
+			toMap1.setShipStrategy(ShipStrategyType.FORWARD);
+			toMap1.setLocalStrategy(LocalStrategy.NONE);
+			SingleInputPlanNode map1 = new SingleInputPlanNode(getMapNode(), "Mapper 1", toMap1, DriverStrategy.MAP);
+			
+			Channel toMap2 = new Channel(map1);
+			toMap2.setShipStrategy(ShipStrategyType.FORWARD);
+			toMap2.setLocalStrategy(LocalStrategy.NONE);
+			SingleInputPlanNode map2 = new SingleInputPlanNode(getMapNode(), "Mapper 2", toMap2, DriverStrategy.MAP);
+			
+			// no feedback properties and none are ever required and present
+			{
+				GlobalProperties gp = new GlobalProperties();
+				LocalProperties lp = new LocalProperties();
+				
+				FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
+				assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
+			}
+			
+			// some global feedback properties and none are ever required and present
+			{
+				GlobalProperties gp = new GlobalProperties();
+				gp.setHashPartitioned(new FieldList(2, 5));
+				LocalProperties lp = new LocalProperties();
+				
+				FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
+				assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
+			}
+			
+			// some local feedback properties and none are ever required and present
+			{
+				GlobalProperties gp = new GlobalProperties();
+				LocalProperties lp = LocalProperties.forGrouping(new FieldList(1, 2));
+				
+				FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
+				assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
+			}
+			
+			// some global and local feedback properties and none are ever required and present
+			{
+				GlobalProperties gp = new GlobalProperties();
+				gp.setHashPartitioned(new FieldList(2, 5));
+				LocalProperties lp = LocalProperties.forGrouping(new FieldList(1, 2));
+				
+				FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
+				assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
+			}
+			
+			// --------------------------- requirements on channel 1 -----------------------
+			
+			// some required global properties, which are matched exactly
+			{
+				GlobalProperties gp = new GlobalProperties();
+				gp.setHashPartitioned(new FieldList(2, 5));
+				LocalProperties lp = new LocalProperties();
+				
+				RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
+				reqGp.setHashPartitioned(new FieldList(2, 5));
+				
+				toMap1.setRequiredGlobalProps(reqGp);
+				toMap1.setRequiredLocalProps(null);
+				
+				toMap2.setRequiredGlobalProps(null);
+				toMap2.setRequiredLocalProps(null);
+				
+				FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
+				assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
+			}
+			
+			// some required local properties, which are matched exactly
+			{
+				GlobalProperties gp = new GlobalProperties();
+				LocalProperties lp = LocalProperties.forGrouping(new FieldList(1, 2));
+				
+				RequestedLocalProperties reqLp = new RequestedLocalProperties();
+				reqLp.setGroupedFields(new FieldList(1, 2));
+				
+				toMap1.setRequiredGlobalProps(null);
+				toMap1.setRequiredLocalProps(reqLp);
+				
+				toMap2.setRequiredGlobalProps(null);
+				toMap2.setRequiredLocalProps(null);
+				
+				FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
+				assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
+			}
+			
+			// some required global and local properties, which are matched exactly
+			{
+				GlobalProperties gp = new GlobalProperties();
+				gp.setHashPartitioned(new FieldList(2, 5));
+				LocalProperties lp = LocalProperties.forGrouping(new FieldList(1, 2));
+				
+				RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
+				reqGp.setHashPartitioned(new FieldList(2, 5));
+				
+				RequestedLocalProperties reqLp = new RequestedLocalProperties();
+				reqLp.setGroupedFields(new FieldList(1, 2));
+				
+				toMap1.setRequiredGlobalProps(reqGp);
+				toMap1.setRequiredLocalProps(reqLp);
+				
+				toMap2.setRequiredGlobalProps(null);
+				toMap2.setRequiredLocalProps(null);
+				
+				FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
+				assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
+			}
+			
+			// some required global and local properties, which are over-fulfilled
+			{
+				GlobalProperties gp = new GlobalProperties();
+				gp.setHashPartitioned(new FieldList(2));
+				LocalProperties lp = LocalProperties.forGrouping(new FieldList(1, 2));
+				
+				RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
+				reqGp.setHashPartitioned(new FieldList(2, 5));
+				
+				RequestedLocalProperties reqLp = new RequestedLocalProperties();
+				reqLp.setGroupedFields(new FieldList(1));
+				
+				toMap1.setRequiredGlobalProps(reqGp);
+				toMap1.setRequiredLocalProps(reqLp);
+				
+				toMap2.setRequiredGlobalProps(null);
+				toMap2.setRequiredLocalProps(null);
+				
+				FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
+				assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
+			}
+			
+			// some required global properties that are not met
+			{
+				GlobalProperties gp = new GlobalProperties();
+				gp.setHashPartitioned(new FieldList(2, 1));
+				LocalProperties lp = new LocalProperties();
+				
+				RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
+				reqGp.setHashPartitioned(new FieldList(2, 5));
+				
+				toMap1.setRequiredGlobalProps(reqGp);
+				toMap1.setRequiredLocalProps(null);
+				
+				toMap2.setRequiredGlobalProps(null);
+				toMap2.setRequiredLocalProps(null);
+				
+				FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
+				assertEquals(NOT_MET, report);
+			}
+			
+			// some required local properties that are not met
+			{
+				GlobalProperties gp = new GlobalProperties();
+				LocalProperties lp = LocalProperties.forGrouping(new FieldList(1));
+				
+				RequestedLocalProperties reqLp = new RequestedLocalProperties();
+				reqLp.setGroupedFields(new FieldList(2, 1));
+				
+				toMap1.setRequiredGlobalProps(null);
+				toMap1.setRequiredLocalProps(reqLp);
+				
+				toMap2.setRequiredGlobalProps(null);
+				toMap2.setRequiredLocalProps(null);
+				
+				FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
+				assertEquals(NOT_MET, report);
+			}
+			
+			// some required global and local properties where the global properties are not met
+			{
+				GlobalProperties gp = new GlobalProperties();
+				gp.setHashPartitioned(new FieldList(2, 1));
+				LocalProperties lp = LocalProperties.forGrouping(new FieldList(1));
+				
+				RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
+				reqGp.setAnyPartitioning(new FieldList(2, 5));
+				
+				RequestedLocalProperties reqLp = new RequestedLocalProperties();
+				reqLp.setGroupedFields(new FieldList(1));
+				
+				toMap1.setRequiredGlobalProps(reqGp);
+				toMap1.setRequiredLocalProps(reqLp);
+				
+				toMap2.setRequiredGlobalProps(null);
+				toMap2.setRequiredLocalProps(null);
+				
+				FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
+				assertEquals(NOT_MET, report);
+			}
+			
+			// some required global and local properties where the local properties are not met
+			{
+				GlobalProperties gp = new GlobalProperties();
+				gp.setHashPartitioned(new FieldList(1));
+				LocalProperties lp = LocalProperties.forGrouping(new FieldList(1));
+				
+				RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
+				reqGp.setAnyPartitioning(new FieldList(1));
+				
+				RequestedLocalProperties reqLp = new RequestedLocalProperties();
+				reqLp.setGroupedFields(new FieldList(2));
+				
+				toMap1.setRequiredGlobalProps(reqGp);
+				toMap1.setRequiredLocalProps(reqLp);
+				
+				toMap2.setRequiredGlobalProps(null);
+				toMap2.setRequiredLocalProps(null);
+				
+				FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
+				assertEquals(NOT_MET, report);
+			}
+			
+			// --------------------------- requirements on channel 2 -----------------------
+			
+			// some required global properties, which are matched exactly
+			{
+				GlobalProperties gp = new GlobalProperties();
+				gp.setHashPartitioned(new FieldList(2, 5));
+				LocalProperties lp = new LocalProperties();
+				
+				RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
+				reqGp.setHashPartitioned(new FieldList(2, 5));
+				
+				toMap1.setRequiredGlobalProps(null);
+				toMap1.setRequiredLocalProps(null);
+				
+				toMap2.setRequiredGlobalProps(reqGp);
+				toMap2.setRequiredLocalProps(null);
+				
+				FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
+				assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
+			}
+			
+			// some required local properties, which are matched exactly
+			{
+				GlobalProperties gp = new GlobalProperties();
+				LocalProperties lp = LocalProperties.forGrouping(new FieldList(1, 2));
+				
+				RequestedLocalProperties reqLp = new RequestedLocalProperties();
+				reqLp.setGroupedFields(new FieldList(1, 2));
+				
+				toMap1.setRequiredGlobalProps(null);
+				toMap1.setRequiredLocalProps(null);
+				
+				toMap2.setRequiredGlobalProps(null);
+				toMap2.setRequiredLocalProps(reqLp);
+				
+				FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
+				assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
+			}
+			
+			// some required global and local properties, which are matched exactly
+			{
+				GlobalProperties gp = new GlobalProperties();
+				gp.setHashPartitioned(new FieldList(2, 5));
+				LocalProperties lp = LocalProperties.forGrouping(new FieldList(1, 2));
+				
+				RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
+				reqGp.setHashPartitioned(new FieldList(2, 5));
+				
+				RequestedLocalProperties reqLp = new RequestedLocalProperties();
+				reqLp.setGroupedFields(new FieldList(1, 2));
+				
+				toMap1.setRequiredGlobalProps(null);
+				toMap1.setRequiredLocalProps(null);
+				
+				toMap2.setRequiredGlobalProps(reqGp);
+				toMap2.setRequiredLocalProps(reqLp);
+				
+				FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
+				assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
+			}
+			
+			// some required global and local properties, which are over-fulfilled
+			{
+				GlobalProperties gp = new GlobalProperties();
+				gp.setHashPartitioned(new FieldList(2));
+				LocalProperties lp = LocalProperties.forGrouping(new FieldList(1, 2));
+				
+				RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
+				reqGp.setHashPartitioned(new FieldList(2, 5));
+				
+				RequestedLocalProperties reqLp = new RequestedLocalProperties();
+				reqLp.setGroupedFields(new FieldList(1));
+				
+				toMap1.setRequiredGlobalProps(null);
+				toMap1.setRequiredLocalProps(null);
+				
+				toMap2.setRequiredGlobalProps(reqGp);
+				toMap2.setRequiredLocalProps(reqLp);
+				
+				FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
+				assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
+			}
+			
+			// some required global properties that are not met
+			{
+				GlobalProperties gp = new GlobalProperties();
+				gp.setHashPartitioned(new FieldList(2, 1));
+				LocalProperties lp = new LocalProperties();
+				
+				RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
+				reqGp.setHashPartitioned(new FieldList(2, 5));
+				
+				toMap1.setRequiredGlobalProps(null);
+				toMap1.setRequiredLocalProps(null);
+				
+				toMap2.setRequiredGlobalProps(reqGp);
+				toMap2.setRequiredLocalProps(null);
+				
+				FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
+				assertEquals(NOT_MET, report);
+			}
+			
+			// some required local properties that are not met
+			{
+				GlobalProperties gp = new GlobalProperties();
+				LocalProperties lp = LocalProperties.forGrouping(new FieldList(1));
+				
+				RequestedLocalProperties reqLp = new RequestedLocalProperties();
+				reqLp.setGroupedFields(new FieldList(2, 1));
+				
+				toMap1.setRequiredGlobalProps(null);
+				toMap1.setRequiredLocalProps(null);
+				
+				toMap2.setRequiredGlobalProps(null);
+				toMap2.setRequiredLocalProps(reqLp);
+				
+				FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
+				assertEquals(NOT_MET, report);
+			}
+			
+			// some required global and local properties where the global properties are not met
+			{
+				GlobalProperties gp = new GlobalProperties();
+				gp.setHashPartitioned(new FieldList(2, 1));
+				LocalProperties lp = LocalProperties.forGrouping(new FieldList(1));
+				
+				RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
+				reqGp.setAnyPartitioning(new FieldList(2, 5));
+				
+				RequestedLocalProperties reqLp = new RequestedLocalProperties();
+				reqLp.setGroupedFields(new FieldList(1));
+				
+				toMap1.setRequiredGlobalProps(null);
+				toMap1.setRequiredLocalProps(null);
+				
+				toMap2.setRequiredGlobalProps(reqGp);
+				toMap2.setRequiredLocalProps(reqLp);
+				
+				FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
+				assertEquals(NOT_MET, report);
+			}
+			
+			// some required global and local properties where the local properties are not met
+			{
+				GlobalProperties gp = new GlobalProperties();
+				gp.setHashPartitioned(new FieldList(1));
+				LocalProperties lp = LocalProperties.forGrouping(new FieldList(1));
+				
+				RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
+				reqGp.setAnyPartitioning(new FieldList(1));
+				
+				RequestedLocalProperties reqLp = new RequestedLocalProperties();
+				reqLp.setGroupedFields(new FieldList(2));
+				
+				toMap1.setRequiredGlobalProps(null);
+				toMap1.setRequiredLocalProps(null);
+				
+				toMap2.setRequiredGlobalProps(reqGp);
+				toMap2.setRequiredLocalProps(reqLp);
+				
+				FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
+				assertEquals(NOT_MET, report);
+			}
+			
+			// ---------------------- requirements mixed on 1 and 2 -----------------------
+			
+			// some required global properties at step one and some more at step 2
+			{
+				GlobalProperties gp = new GlobalProperties();
+				gp.setHashPartitioned(new FieldList(1, 2));
+				LocalProperties lp = LocalProperties.EMPTY;
+				
+				RequestedGlobalProperties reqGp1 = new RequestedGlobalProperties();
+				reqGp1.setAnyPartitioning(new FieldList(1, 2));
+				
+				RequestedGlobalProperties reqGp2 = new RequestedGlobalProperties();
+				reqGp2.setHashPartitioned(new FieldList(1, 2));
+				
+				toMap1.setRequiredGlobalProps(reqGp1);
+				toMap1.setRequiredLocalProps(null);
+				
+				toMap2.setRequiredGlobalProps(reqGp2);
+				toMap2.setRequiredLocalProps(null);
+				
+				FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
+				assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
+			}
+			
+			// some required local properties at step one and some more at step 2
+			{
+				GlobalProperties gp = new GlobalProperties();
+				LocalProperties lp = LocalProperties.forOrdering(new Ordering(3, null, Order.ASCENDING).appendOrdering(1, null, Order.DESCENDING));
+				
+				RequestedLocalProperties reqLp1 = new RequestedLocalProperties();
+				reqLp1.setGroupedFields(new FieldList(3, 1));
+				
+				RequestedLocalProperties reqLp2 = new RequestedLocalProperties();
+				reqLp2.setOrdering(new Ordering(3, null, Order.ANY).appendOrdering(1, null, Order.ANY));
+				
+				toMap1.setRequiredGlobalProps(null);
+				toMap1.setRequiredLocalProps(reqLp1);
+				
+				toMap2.setRequiredGlobalProps(null);
+				toMap2.setRequiredLocalProps(reqLp2);
+				
+				FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
+				assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
+			}
+			
+			// some required global properties at step one and some local ones at step 2
+			{
+				GlobalProperties gp = new GlobalProperties();
+				gp.setHashPartitioned(new FieldList(1, 2));
+				LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
+				
+				RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
+				reqGp.setAnyPartitioning(new FieldList(1, 2));
+				
+				RequestedLocalProperties reqLp = new RequestedLocalProperties();
+				reqLp.setGroupedFields(new FieldList(2));
+				
+				toMap1.setRequiredGlobalProps(reqGp);
+				toMap1.setRequiredLocalProps(null);
+				
+				toMap2.setRequiredGlobalProps(null);
+				toMap2.setRequiredLocalProps(reqLp);
+				
+				FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
+				assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
+			}
+			
+			// some required local properties at step one and some global ones at step 2
+			{
+				GlobalProperties gp = new GlobalProperties();
+				gp.setHashPartitioned(new FieldList(1, 2));
+				LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
+				
+				RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
+				reqGp.setAnyPartitioning(new FieldList(1, 2));
+				
+				RequestedLocalProperties reqLp = new RequestedLocalProperties();
+				reqLp.setGroupedFields(new FieldList(2));
+				
+				toMap1.setRequiredGlobalProps(null);
+				toMap1.setRequiredLocalProps(reqLp);
+				
+				toMap2.setRequiredGlobalProps(reqGp);
+				toMap2.setRequiredLocalProps(null);
+				
+				FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
+				assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
+			}
+			
+			// some fulfilled global properties at step one and some non-fulfilled local ones at step 2
+			{
+				GlobalProperties gp = new GlobalProperties();
+				gp.setHashPartitioned(new FieldList(1, 2));
+				LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
+				
+				RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
+				reqGp.setAnyPartitioning(new FieldList(1, 2));
+				
+				RequestedLocalProperties reqLp = new RequestedLocalProperties();
+				reqLp.setGroupedFields(new FieldList(2, 3));
+				
+				toMap1.setRequiredGlobalProps(reqGp);
+				toMap1.setRequiredLocalProps(null);
+				
+				toMap2.setRequiredGlobalProps(null);
+				toMap2.setRequiredLocalProps(reqLp);
+				
+				FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
+				assertEquals(NOT_MET, report);
+			}
+			
+			// some fulfilled local properties at step one and some non-fulfilled global ones at step 2
+			{
+				GlobalProperties gp = new GlobalProperties();
+				gp.setHashPartitioned(new FieldList(1, 2));
+				LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
+				
+				RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
+				reqGp.setAnyPartitioning(new FieldList(2, 3));
+				
+				RequestedLocalProperties reqLp = new RequestedLocalProperties();
+				reqLp.setGroupedFields(new FieldList(2, 1));
+				
+				toMap1.setRequiredGlobalProps(null);
+				toMap1.setRequiredLocalProps(reqLp);
+				
+				toMap2.setRequiredGlobalProps(reqGp);
+				toMap2.setRequiredLocalProps(null);
+				
+				FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
+				assertEquals(NOT_MET, report);
+			}
+			
+			// some non-fulfilled global properties at step one and some fulfilled local ones at step 2
+			{
+				GlobalProperties gp = new GlobalProperties();
+				gp.setHashPartitioned(new FieldList(1, 2));
+				LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
+				
+				RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
+				reqGp.setAnyPartitioning(new FieldList(2, 3));
+				
+				RequestedLocalProperties reqLp = new RequestedLocalProperties();
+				reqLp.setGroupedFields(new FieldList(2, 1));
+				
+				toMap1.setRequiredGlobalProps(reqGp);
+				toMap1.setRequiredLocalProps(null);
+				
+				toMap2.setRequiredGlobalProps(null);
+				toMap2.setRequiredLocalProps(reqLp);
+				
+				FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
+				assertEquals(NOT_MET, report);
+			}
+			
+			// some non-fulfilled local properties at step one and some fulfilled global ones at step 2
+			{
+				GlobalProperties gp = new GlobalProperties();
+				gp.setHashPartitioned(new FieldList(1, 2));
+				LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
+				
+				RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
+				reqGp.setAnyPartitioning(new FieldList(1, 2));
+				
+				RequestedLocalProperties reqLp = new RequestedLocalProperties();
+				reqLp.setGroupedFields(new FieldList(2, 1, 3));
+				
+				toMap1.setRequiredGlobalProps(null);
+				toMap1.setRequiredLocalProps(reqLp);
+				
+				toMap2.setRequiredGlobalProps(reqGp);
+				toMap2.setRequiredLocalProps(null);
+				
+				FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
+				assertEquals(NOT_MET, report);
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testSingleInputOperatorsWithReCreation() {
+		try {
+			SourcePlanNode target = new SourcePlanNode(getSourceNode(), "Source");
+			
+			Channel toMap1 = new Channel(target);
+			SingleInputPlanNode map1 = new SingleInputPlanNode(getMapNode(), "Mapper 1", toMap1, DriverStrategy.MAP);
+			
+			Channel toMap2 = new Channel(map1);
+			SingleInputPlanNode map2 = new SingleInputPlanNode(getMapNode(), "Mapper 2", toMap2, DriverStrategy.MAP);
+			
+			// set ship strategy in first channel, so later non matching global properties do not matter
+			{
+				GlobalProperties gp = new GlobalProperties();
+				gp.setHashPartitioned(new FieldList(1, 2));
+				LocalProperties lp = LocalProperties.EMPTY;
+				
+				RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
+				reqGp.setAnyPartitioning(new FieldSet(2, 5));
+				
+				toMap1.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(2, 5));
+				toMap1.setLocalStrategy(LocalStrategy.NONE);
+				
+				toMap2.setShipStrategy(ShipStrategyType.FORWARD);
+				toMap2.setLocalStrategy(LocalStrategy.NONE);
+				
+				
+				toMap1.setRequiredGlobalProps(null);
+				toMap1.setRequiredLocalProps(null);
+				
+				toMap2.setRequiredGlobalProps(reqGp);
+				toMap2.setRequiredLocalProps(null);
+				
+				FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
+				assertEquals(MET, report);
+			}
+			
+			// set ship strategy in second channel, so previous non matching global properties void the match
+			{
+				GlobalProperties gp = new GlobalProperties();
+				gp.setHashPartitioned(new FieldList(1, 2));
+				LocalProperties lp = LocalProperties.EMPTY;
+				
+				RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
+				reqGp.setAnyPartitioning(new FieldSet(2, 5));
+				
+				toMap1.setShipStrategy(ShipStrategyType.FORWARD);
+				toMap1.setLocalStrategy(LocalStrategy.NONE);
+				
+				toMap2.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(2, 5));
+				toMap2.setLocalStrategy(LocalStrategy.NONE);
+				
+				
+				toMap1.setRequiredGlobalProps(reqGp);
+				toMap1.setRequiredLocalProps(null);
+				
+				toMap2.setRequiredGlobalProps(null);
+				toMap2.setRequiredLocalProps(null);
+				
+				FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
+				assertEquals(NOT_MET, report);
+			}
+			
+			// set local strategy in first channel, so later non matching local properties do not matter
+			{
+				GlobalProperties gp = new GlobalProperties();
+				gp.setHashPartitioned(new FieldList(1, 2));
+				LocalProperties lp = LocalProperties.forOrdering(new Ordering(3, null, Order.ASCENDING).appendOrdering(1, null, Order.DESCENDING));
+				
+				RequestedLocalProperties reqLp = new RequestedLocalProperties();
+				reqLp.setGroupedFields(new FieldList(4, 1));
+				
+				toMap1.setShipStrategy(ShipStrategyType.FORWARD);
+				toMap1.setLocalStrategy(LocalStrategy.SORT, new FieldList(5, 7), new boolean[] {false, false});
+				
+				toMap2.setShipStrategy(ShipStrategyType.FORWARD);
+				toMap2.setLocalStrategy(LocalStrategy.NONE);
+				
+				toMap1.setRequiredGlobalProps(null);
+				toMap1.setRequiredLocalProps(null);
+				
+				toMap2.setRequiredGlobalProps(null);
+				toMap2.setRequiredLocalProps(reqLp);
+				
+				FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
+				assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
+			}
+			
+			// set local strategy in second channel, so previous non matching local properties void the match
+			{
+				GlobalProperties gp = new GlobalProperties();
+				gp.setHashPartitioned(new FieldList(1, 2));
+				LocalProperties lp = LocalProperties.forOrdering(new Ordering(3, null, Order.ASCENDING).appendOrdering(1, null, Order.DESCENDING));
+				
+				RequestedLocalProperties reqLp = new RequestedLocalProperties();
+				reqLp.setGroupedFields(new FieldList(4, 1));
+				
+				toMap1.setShipStrategy(ShipStrategyType.FORWARD);
+				toMap1.setLocalStrategy(LocalStrategy.NONE);
+				
+				toMap2.setShipStrategy(ShipStrategyType.FORWARD);
+				toMap2.setLocalStrategy(LocalStrategy.SORT, new FieldList(5, 7), new boolean[] {false, false});
+				
+				
+				toMap1.setRequiredGlobalProps(null);
+				toMap1.setRequiredLocalProps(reqLp);
+				
+				toMap2.setRequiredGlobalProps(null);
+				toMap2.setRequiredLocalProps(null);
+				
+				FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
+				assertEquals(NOT_MET, report);
+			}
+			
+			// create the properties on the same node as the requirement
+			{
+				GlobalProperties gp = new GlobalProperties();
+				gp.setHashPartitioned(new FieldList(1, 2));
+				LocalProperties lp = LocalProperties.forOrdering(new Ordering(3, null, Order.ASCENDING).appendOrdering(1, null, Order.DESCENDING));
+				
+				RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
+				reqGp.setAnyPartitioning(new FieldSet(5, 7));
+				
+				RequestedLocalProperties reqLp = new RequestedLocalProperties();
+				reqLp.setGroupedFields(new FieldList(5, 7));
+				
+				toMap1.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(5, 7));
+				toMap1.setLocalStrategy(LocalStrategy.SORT, new FieldList(5, 7), new boolean[] {false, false});
+				
+				toMap2.setShipStrategy(ShipStrategyType.FORWARD);
+				toMap2.setLocalStrategy(LocalStrategy.NONE);
+				
+				toMap1.setRequiredGlobalProps(reqGp);
+				toMap1.setRequiredLocalProps(reqLp);
+				
+				toMap2.setRequiredGlobalProps(null);
+				toMap2.setRequiredLocalProps(null);
+				
+				FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
+				assertEquals(MET, report);
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testSingleInputOperatorsChainOfThree() {
+		try {
+			SourcePlanNode target = new SourcePlanNode(getSourceNode(), "Source");
+			
+			Channel toMap1 = new Channel(target);
+			SingleInputPlanNode map1 = new SingleInputPlanNode(getMapNode(), "Mapper 1", toMap1, DriverStrategy.MAP);
+			
+			Channel toMap2 = new Channel(map1);
+			SingleInputPlanNode map2 = new SingleInputPlanNode(getMapNode(), "Mapper 2", toMap2, DriverStrategy.MAP);
+			
+			Channel toMap3 = new Channel(map2);
+			SingleInputPlanNode map3 = new SingleInputPlanNode(getMapNode(), "Mapper 3", toMap3, DriverStrategy.MAP);
+			
+			// set local strategy in first channel, so later non matching local properties do not matter
+			{
+				GlobalProperties gp = new GlobalProperties();
+				LocalProperties lp = LocalProperties.forOrdering(new Ordering(3, null, Order.ASCENDING).appendOrdering(1, null, Order.DESCENDING));
+				
+				RequestedLocalProperties reqLp = new RequestedLocalProperties();
+				reqLp.setGroupedFields(new FieldList(4, 1));
+				
+				toMap1.setShipStrategy(ShipStrategyType.FORWARD);
+				toMap1.setLocalStrategy(LocalStrategy.SORT, new FieldList(5, 7), new boolean[] {false, false});
+				
+				toMap2.setShipStrategy(ShipStrategyType.FORWARD);
+				toMap2.setLocalStrategy(LocalStrategy.NONE);
+				
+				toMap3.setShipStrategy(ShipStrategyType.FORWARD);
+				toMap3.setLocalStrategy(LocalStrategy.NONE);
+				
+				toMap1.setRequiredGlobalProps(null);
+				toMap1.setRequiredLocalProps(null);
+				
+				toMap2.setRequiredGlobalProps(null);
+				toMap2.setRequiredLocalProps(null);
+				
+				toMap3.setRequiredGlobalProps(null);
+				toMap3.setRequiredLocalProps(reqLp);
+				
+				FeedbackPropertiesMeetRequirementsReport report = map3.checkPartialSolutionPropertiesMet(target, gp, lp);
+				assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
+			}
+			
+			// set global strategy in first channel, so later non matching global properties do not matter
+			{
+				GlobalProperties gp = new GlobalProperties();
+				gp.setHashPartitioned(new FieldList(5, 3));
+				LocalProperties lp = LocalProperties.EMPTY;
+				
+				RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
+				reqGp.setAnyPartitioning(new FieldSet(2, 3));
+				
+				toMap1.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(1, 2));
+				toMap1.setLocalStrategy(LocalStrategy.NONE);
+				
+				toMap2.setShipStrategy(ShipStrategyType.FORWARD);
+				toMap2.setLocalStrategy(LocalStrategy.NONE);
+				
+				toMap3.setShipStrategy(ShipStrategyType.FORWARD);
+				toMap3.setLocalStrategy(LocalStrategy.NONE);
+				
+				toMap1.setRequiredGlobalProps(null);
+				toMap1.setRequiredLocalProps(null);
+				
+				toMap2.setRequiredGlobalProps(null);
+				toMap2.setRequiredLocalProps(null);
+				
+				toMap3.setRequiredGlobalProps(reqGp);
+				toMap3.setRequiredLocalProps(null);
+				
+				FeedbackPropertiesMeetRequirementsReport report = map3.checkPartialSolutionPropertiesMet(target, gp, lp);
+				assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testNoPartialSolutionFoundTwoInputOperator() {
+		try {
+			SourcePlanNode target = new SourcePlanNode(getSourceNode(), "Partial Solution");
+
+			SourcePlanNode source1 = new SourcePlanNode(getSourceNode(), "Source 1");
+			SourcePlanNode source2 = new SourcePlanNode(getSourceNode(), "Source 2");
+			
+			Channel toMap1 = new Channel(source1);
+			toMap1.setShipStrategy(ShipStrategyType.FORWARD);
+			toMap1.setLocalStrategy(LocalStrategy.NONE);
+			SingleInputPlanNode map1 = new SingleInputPlanNode(getMapNode(), "Mapper 1", toMap1, DriverStrategy.MAP);
+			
+			Channel toMap2 = new Channel(source2);
+			toMap2.setShipStrategy(ShipStrategyType.FORWARD);
+			toMap2.setLocalStrategy(LocalStrategy.NONE);
+			SingleInputPlanNode map2 = new SingleInputPlanNode(getMapNode(), "Mapper 2", toMap2, DriverStrategy.MAP);
+			
+			Channel toJoin1 = new Channel(map1);
+			Channel toJoin2 = new Channel(map2);
+			
+			toJoin1.setShipStrategy(ShipStrategyType.FORWARD);
+			toJoin1.setLocalStrategy(LocalStrategy.NONE);
+			toJoin2.setShipStrategy(ShipStrategyType.FORWARD);
+			toJoin2.setLocalStrategy(LocalStrategy.NONE);
+			
+			DualInputPlanNode join = new DualInputPlanNode(getJoinNode(), "Join", toJoin1, toJoin2, DriverStrategy.HYBRIDHASH_BUILD_FIRST);
+			
+			FeedbackPropertiesMeetRequirementsReport report = join.checkPartialSolutionPropertiesMet(target, new GlobalProperties(), new LocalProperties());
+			assertEquals(NO_PARTIAL_SOLUTION, report);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testTwoOperatorsOneIndependent() {
+		try {
+			SourcePlanNode target = new SourcePlanNode(getSourceNode(), "Partial Solution");
+			SourcePlanNode source = new SourcePlanNode(getSourceNode(), "Other Source");
+			
+			Channel toMap1 = new Channel(target);
+			toMap1.setShipStrategy(ShipStrategyType.FORWARD);
+			toMap1.setLocalStrategy(LocalStrategy.NONE);
+			SingleInputPlanNode map1 = new SingleInputPlanNode(getMapNode(), "Mapper 1", toMap1, DriverStrategy.MAP);
+			
+			Channel toMap2 = new Channel(source);
+			toMap2.setShipStrategy(ShipStrategyType.FORWARD);
+			toMap2.setLocalStrategy(LocalStrategy.NONE);
+			SingleInputPlanNode map2 = new SingleInputPlanNode(getMapNode(), "Mapper 2", toMap2, DriverStrategy.MAP);
+			
+			Channel toJoin1 = new Channel(map1);
+			Channel toJoin2 = new Channel(map2);
+			
+			DualInputPlanNode join = new DualInputPlanNode(getJoinNode(), "Join", toJoin1, toJoin2, DriverStrategy.HYBRIDHASH_BUILD_FIRST);
+			
+			Channel toAfterJoin = new Channel(join);
+			toAfterJoin.setShipStrategy(ShipStrategyType.FORWARD);
+			toAfterJoin.setLocalStrategy(LocalStrategy.NONE);
+			SingleInputPlanNode afterJoin = new SingleInputPlanNode(getMapNode(), "After Join Mapper", toAfterJoin, DriverStrategy.MAP);
+			
+			// attach some properties to the non-relevant input
+			{
+				toMap2.setShipStrategy(ShipStrategyType.BROADCAST);
+				toMap2.setLocalStrategy(LocalStrategy.SORT, new FieldList(2, 7), new boolean[] {true, true});
+				
+				RequestedGlobalProperties joinGp = new RequestedGlobalProperties();
+				joinGp.setFullyReplicated();
+				
+				RequestedLocalProperties joinLp = new RequestedLocalProperties();
+				joinLp.setOrdering(new Ordering(2, null, Order.ASCENDING).appendOrdering(7, null, Order.ASCENDING));
+				
+				toJoin2.setShipStrategy(ShipStrategyType.FORWARD);
+				toJoin2.setLocalStrategy(LocalStrategy.NONE);
+				toJoin2.setRequiredGlobalProps(joinGp);
+				toJoin2.setRequiredLocalProps(joinLp);
+			}
+			
+			// ------------------------------------------------------------------------------------
+			
+			// no properties from the partial solution, no required properties
+			{
+				toJoin1.setShipStrategy(ShipStrategyType.FORWARD);
+				toJoin1.setLocalStrategy(LocalStrategy.NONE);
+				
+				GlobalProperties gp = new GlobalProperties();
+				LocalProperties lp = LocalProperties.EMPTY;
+				
+				FeedbackPropertiesMeetRequirementsReport report = join.checkPartialSolutionPropertiesMet(target, gp, lp);
+				assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
+			}
+			
+			// some properties from the partial solution, no required properties
+			{
+				toJoin1.setShipStrategy(ShipStrategyType.FORWARD);
+				toJoin1.setLocalStrategy(LocalStrategy.NONE);
+				
+				GlobalProperties gp = new GlobalProperties();
+				gp.setHashPartitioned(new FieldList(0));
+				LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
+				
+				FeedbackPropertiesMeetRequirementsReport report = join.checkPartialSolutionPropertiesMet(target, gp, lp);
+				assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
+			}
+			
+			
+			// produced properties match relevant input
+			{
+				GlobalProperties gp = new GlobalProperties();
+				gp.setHashPartitioned(new FieldList(0));
+				LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
+				
+				RequestedGlobalProperties rgp = new RequestedGlobalProperties();
+				rgp.setHashPartitioned(new FieldList(0));
+				
+				RequestedLocalProperties rlp = new RequestedLocalProperties();
+				rlp.setGroupedFields(new FieldList(2));
+				
+				toJoin1.setRequiredGlobalProps(rgp);
+				toJoin1.setRequiredLocalProps(rlp);
+				
+				toJoin1.setShipStrategy(ShipStrategyType.FORWARD);
+				toJoin1.setLocalStrategy(LocalStrategy.NONE);
+				
+				FeedbackPropertiesMeetRequirementsReport report = join.checkPartialSolutionPropertiesMet(target, gp, lp);
+				assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
+			}
+			
+			// produced properties do not match relevant input
+			{
+				GlobalProperties gp = new GlobalProperties();
+				gp.setHashPartitioned(new FieldList(0));
+				LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
+				
+				RequestedGlobalProperties rgp = new RequestedGlobalProperties();
+				rgp.setHashPartitioned(new FieldList(0));
+				
+				RequestedLocalProperties rlp = new RequestedLocalProperties();
+				rlp.setGroupedFields(new FieldList(1, 2, 3));
+				
+				toJoin1.setRequiredGlobalProps(rgp);
+				toJoin1.setRequiredLocalProps(rlp);
+				
+				toJoin1.setShipStrategy(ShipStrategyType.FORWARD);
+				toJoin1.setLocalStrategy(LocalStrategy.NONE);
+				
+				FeedbackPropertiesMeetRequirementsReport report = join.checkPartialSolutionPropertiesMet(target, gp, lp);
+				assertEquals(NOT_MET, report);
+			}
+			
+			// produced properties overridden before join
+			{
+				GlobalProperties gp = new GlobalProperties();
+				gp.setHashPartitioned(new FieldList(0));
+				LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
+				
+				RequestedGlobalProperties rgp = new RequestedGlobalProperties();
+				rgp.setHashPartitioned(new FieldList(0));
+				
+				RequestedLocalProperties rlp = new RequestedLocalProperties();
+				rlp.setGroupedFields(new FieldList(2, 1));
+				
+				toMap1.setRequiredGlobalProps(rgp);
+				toMap1.setRequiredLocalProps(rlp);
+				
+				toJoin1.setRequiredGlobalProps(null);
+				toJoin1.setRequiredLocalProps(null);
+				
+				toJoin1.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(2, 1));
+				toJoin1.setLocalStrategy(LocalStrategy.SORT, new FieldList(7, 3), new boolean[] {true, false});
+				
+				FeedbackPropertiesMeetRequirementsReport report = join.checkPartialSolutionPropertiesMet(target, gp, lp);
+				assertEquals(MET, report);
+			}
+			
+			// produced properties before join match, after join match as well
+			{
+				GlobalProperties gp = new GlobalProperties();
+				gp.setHashPartitioned(new FieldList(0));
+				LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
+				
+				RequestedGlobalProperties rgp = new RequestedGlobalProperties();
+				rgp.setHashPartitioned(new FieldList(0));
+				
+				RequestedLocalProperties rlp = new RequestedLocalProperties();
+				rlp.setGroupedFields(new FieldList(2, 1));
+				
+				toMap1.setRequiredGlobalProps(null);
+				toMap1.setRequiredLocalProps(null);
+				
+				toJoin1.setShipStrategy(ShipStrategyType.FORWARD);
+				toJoin1.setLocalStrategy(LocalStrategy.NONE);
+				
+				toJoin1.setRequiredGlobalProps(rgp);
+				toJoin1.setRequiredLocalProps(rlp);
+			
+				toAfterJoin.setShipStrategy(ShipStrategyType.FORWARD);
+				toAfterJoin.setLocalStrategy(LocalStrategy.NONE);
+				
+				toAfterJoin.setRequiredGlobalProps(rgp);
+				toAfterJoin.setRequiredLocalProps(rlp);
+				
+				FeedbackPropertiesMeetRequirementsReport report = join.checkPartialSolutionPropertiesMet(target, gp, lp);
+				assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
+			}
+			
+			// produced properties before join match, after join do not match
+			{
+				GlobalProperties gp = new GlobalProperties();
+				gp.setHashPartitioned(new FieldList(0));
+				LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
+				
+				RequestedGlobalProperties rgp1 = new RequestedGlobalProperties();
+				rgp1.setHashPartitioned(new FieldList(0));
+				
+				RequestedGlobalProperties rgp2 = new RequestedGlobalProperties();
+				rgp2.setHashPartitioned(new FieldList(3));
+				
+				RequestedLocalProperties rlp1 = new RequestedLocalProperties();
+				rlp1.setGroupedFields(new FieldList(2, 1));
+				
+				RequestedLocalProperties rlp2 = new RequestedLocalProperties();
+				rlp2.setGroupedFields(new FieldList(3, 4));
+				
+				toJoin1.setRequiredGlobalProps(rgp1);
+				toJoin1.setRequiredLocalProps(rlp1);
+			
+				toAfterJoin.setShipStrategy(ShipStrategyType.FORWARD);
+				toAfterJoin.setLocalStrategy(LocalStrategy.NONE);
+				
+				toAfterJoin.setRequiredGlobalProps(rgp2);
+				toAfterJoin.setRequiredLocalProps(rlp2);
+				
+				FeedbackPropertiesMeetRequirementsReport report = afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
+				assertEquals(NOT_MET, report);
+			}
+			
+			// produced properties are overridden, does not matter that they do not match
+			{
+				GlobalProperties gp = new GlobalProperties();
+				gp.setAnyPartitioning(new FieldList(0));
+				LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
+				
+				RequestedGlobalProperties rgp = new RequestedGlobalProperties();
+				rgp.setHashPartitioned(new FieldList(1));
+				
+				RequestedLocalProperties rlp = new RequestedLocalProperties();
+				rlp.setGroupedFields(new FieldList(1, 2, 3));
+				
+				toJoin1.setRequiredGlobalProps(null);
+				toJoin1.setRequiredLocalProps(null);
+				
+				toJoin1.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(2, 1));
+				toJoin1.setLocalStrategy(LocalStrategy.SORT, new FieldList(7, 3), new boolean[] {true, false});
+				
+				toAfterJoin.setRequiredGlobalProps(rgp);
+				toAfterJoin.setRequiredLocalProps(rlp);
+				
+				FeedbackPropertiesMeetRequirementsReport report = afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
+				assertEquals(MET, report);
+			}
+			
+			// local property overridden before join, local property mismatch after join not relevant
+			{
+				GlobalProperties gp = new GlobalProperties();
+				gp.setAnyPartitioning(new FieldList(0));
+				LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
+				
+				RequestedLocalProperties rlp = new RequestedLocalProperties();
+				rlp.setGroupedFields(new FieldList(1, 2, 3));
+				
+				toJoin1.setRequiredGlobalProps(null);
+				toJoin1.setRequiredLocalProps(null);
+				
+				toJoin1.setShipStrategy(ShipStrategyType.FORWARD);
+				toJoin1.setLocalStrategy(LocalStrategy.SORT, new FieldList(7, 3), new boolean[] {true, false});
+				
+				toAfterJoin.setRequiredGlobalProps(null);
+				toAfterJoin.setRequiredLocalProps(rlp);
+				
+				FeedbackPropertiesMeetRequirementsReport report = afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
+				assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
+			}
+			
+			// local property overridden before join, global property mismatch after join void the match
+			{
+				GlobalProperties gp = new GlobalProperties();
+				gp.setAnyPartitioning(new FieldList(0));
+				LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
+				
+				RequestedGlobalProperties rgp = new RequestedGlobalProperties();
+				rgp.setHashPartitioned(new FieldList(1));
+				
+				RequestedLocalProperties rlp = new RequestedLocalProperties();
+				rlp.setGroupedFields(new FieldList(1, 2, 3));
+				
+				toJoin1.setRequiredGlobalProps(null);
+				toJoin1.setRequiredLocalProps(null);
+				
+				toJoin1.setShipStrategy(ShipStrategyType.FORWARD);
+				toJoin1.setLocalStrategy(LocalStrategy.SORT, new FieldList(7, 3), new boolean[] {true, false});
+				
+				toAfterJoin.setRequiredGlobalProps(rgp);
+				toAfterJoin.setRequiredLocalProps(rlp);
+				
+				FeedbackPropertiesMeetRequirementsReport report = afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
+				assertEquals(NOT_MET, report);
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testTwoOperatorsBothDependent() {
+		try {
+			SourcePlanNode target = new SourcePlanNode(getSourceNode(), "Partial Solution");
+			
+			Channel toMap1 = new Channel(target);
+			toMap1.setShipStrategy(ShipStrategyType.FORWARD);
+			toMap1.setLocalStrategy(LocalStrategy.NONE);
+			SingleInputPlanNode map1 = new SingleInputPlanNode(getMapNode(), "Mapper 1", toMap1, DriverStrategy.MAP);
+			
+			Channel toMap2 = new Channel(target);
+			toMap2.setShipStrategy(ShipStrategyType.FORWARD);
+			toMap2.setLocalStrategy(LocalStrategy.NONE);
+			SingleInputPlanNode map2 = new SingleInputPlanNode(getMapNode(), "Mapper 2", toMap2, DriverStrategy.MAP);
+			
+			Channel toJoin1 = new Channel(map1);
+			toJoin1.setShipStrategy(ShipStrategyType.FORWARD);
+			toJoin1.setLocalStrategy(LocalStrategy.NONE);
+			
+			Channel toJoin2 = new Channel(map2);
+			toJoin2.setShipStrategy(ShipStrategyType.FORWARD);
+			toJoin2.setLocalStrategy(LocalStrategy.NONE);
+			
+			DualInputPlanNode join = new DualInputPlanNode(getJoinNode(), "Join", toJoin1, toJoin2, DriverStrategy.HYBRIDHASH_BUILD_FIRST);
+			
+			Channel toAfterJoin = new Channel(join);
+			toAfterJoin.setShipStrategy(ShipStrategyType.FORWARD);
+			toAfterJoin.setLocalStrategy(LocalStrategy.NONE);
+			SingleInputPlanNode afterJoin = new SingleInputPlanNode(getMapNode(), "After Join Mapper", toAfterJoin, DriverStrategy.MAP);
+			
+			// no properties from the partial solution, no required properties
+			{
+				GlobalProperties gp = new GlobalProperties();
+				LocalProperties lp = LocalProperties.EMPTY;
+				
+				FeedbackPropertiesMeetRequirementsReport report = afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
+				assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
+			}
+			
+			// some properties from the partial solution, no required properties
+			{
+				GlobalProperties gp = new GlobalProperties();
+				gp.setHashPartitioned(new FieldList(0));
+				LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
+				
+				FeedbackPropertiesMeetRequirementsReport report = afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
+				assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
+			}
+			
+			// test requirements on one input and met
+			{
+				GlobalProperties gp = new GlobalProperties();
+				gp.setHashPartitioned(new FieldList(0));
+				LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
+				
+				RequestedGlobalProperties rgp = new RequestedGlobalProperties();
+				rgp.setHashPartitioned(new FieldList(0));
+				
+				RequestedLocalProperties rlp = new RequestedLocalProperties();
+				rlp.setGroupedFields(new FieldList(2, 1));
+				
+				toJoin1.setRequiredGlobalProps(rgp);
+				toJoin1.setRequiredLocalProps(rlp);
+				
+				FeedbackPropertiesMeetRequirementsReport report = afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
+				assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
+			}
+			
+			// test requirements on both input and met
+			{
+				GlobalProperties gp = new GlobalProperties();
+				gp.setHashPartitioned(new FieldList(0));
+				LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
+				
+				RequestedGlobalProperties rgp = new RequestedGlobalProperties();
+				rgp.setHashPartitioned(new FieldList(0));
+				
+				RequestedLocalProperties rlp = new RequestedLocalProperties();
+				rlp.setGroupedFields(new FieldList(2, 1));
+				
+				toJoin1.setRequiredGlobalProps(rgp);
+				toJoin1.setRequiredLocalProps(rlp);
+				
+				toJoin2.setRequiredGlobalProps(rgp);
+				toJoin2.setRequiredLocalProps(rlp);
+				
+				FeedbackPropertiesMeetRequirementsReport report = afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
+				assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
+			}
+			
+			// test requirements on both inputs, one not met
+			{
+				GlobalProperties gp = new GlobalProperties();
+				gp.setHashPartitioned(new FieldList(0));
+				LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
+				
+				RequestedGlobalProperties rgp1 = new RequestedGlobalProperties();
+				rgp1.setHashPartitioned(new FieldList(0));
+				
+				RequestedLocalProperties rlp1 = new RequestedLocalProperties();
+				rlp1.setGroupedFields(new FieldList(2, 1));
+				
+				RequestedGlobalProperties rgp2 = new RequestedGlobalProperties();
+				rgp2.setHashPartitioned(new FieldList(1));
+				
+				RequestedLocalProperties rlp2 = new RequestedLocalProperties();
+				rlp2.setGroupedFields(new FieldList(0, 3));
+				
+				toJoin1.setRequiredGlobalProps(rgp1);
+				toJoin1.setRequiredLocalProps(rlp1);
+				
+				toJoin2.setRequiredGlobalProps(rgp2);
+				toJoin2.setRequiredLocalProps(rlp2);
+				
+				FeedbackPropertiesMeetRequirementsReport report = afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
+				assertEquals(NOT_MET, report);
+			}
+			
+			// test override on both inputs, later requirement ignored
+			{
+				GlobalProperties gp = new GlobalProperties();
+				gp.setHashPartitioned(new FieldList(0));
+				LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
+				
+				RequestedGlobalProperties rgp = new RequestedGlobalProperties();
+				rgp.setHashPartitioned(new FieldList(1));
+				
+				RequestedLocalProperties rlp = new RequestedLocalProperties();
+				rlp.setGroupedFields(new FieldList(0, 3));
+				
+				toJoin1.setRequiredGlobalProps(null);
+				toJoin1.setRequiredLocalProps(null);
+				
+				toJoin2.setRequiredGlobalProps(null);
+				toJoin2.setRequiredLocalProps(null);
+				
+				toJoin1.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(88));
+				toJoin2.setShipStrategy(ShipStrategyType.BROADCAST);
+				
+				toAfterJoin.setRequiredGlobalProps(rgp);
+				toAfterJoin.setRequiredLocalProps(rlp);
+				
+				FeedbackPropertiesMeetRequirementsReport report = afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
+				assertEquals(MET, report);
+			}
+			
+			// test override on one inputs, later requirement met
+			{
+				GlobalProperties gp = new GlobalProperties();
+				gp.setHashPartitioned(new FieldList(0));
+				LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
+				
+				RequestedGlobalProperties rgp = new RequestedGlobalProperties();
+				rgp.setHashPartitioned(new FieldList(0));
+				
+				RequestedLocalProperties rlp = new RequestedLocalProperties();
+				rlp.setGroupedFields(new FieldList(2, 1));
+				
+				toJoin1.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(88));
+				toJoin2.setShipStrategy(ShipStrategyType.FORWARD);
+				
+				toAfterJoin.setRequiredGlobalProps(rgp);
+				toAfterJoin.setRequiredLocalProps(rlp);
+				
+				FeedbackPropertiesMeetRequirementsReport report = afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
+				assertEquals(PENDING, report);
+			}
+			
+			// test override on one input, later requirement not met
+			{
+				GlobalProperties gp = new GlobalProperties();
+				gp.setHashPartitioned(new FieldList(0));
+				LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
+				
+				RequestedGlobalProperties rgp = new RequestedGlobalProperties();
+				rgp.setHashPartitioned(new FieldList(3));
+				
+				RequestedLocalProperties rlp = new RequestedLocalProperties();
+				rlp.setGroupedFields(new FieldList(77, 69));
+				
+				toJoin1.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(88));
+				toJoin2.setShipStrategy(ShipStrategyType.FORWARD);
+				
+				toAfterJoin.setRequiredGlobalProps(rgp);
+				toAfterJoin.setRequiredLocalProps(rlp);
+				
+				FeedbackPropertiesMeetRequirementsReport report = afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
+				assertEquals(NOT_MET, report);
+			}
+			
+			// test override on one input locally, later global requirement not met
+			{
+				GlobalProperties gp = new GlobalProperties();
+				gp.setHashPartitioned(new FieldList(0));
+				LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
+				
+				RequestedGlobalProperties rgp = new RequestedGlobalProperties();
+				rgp.setHashPartitioned(new FieldList(3));
+				
+				
+				toJoin1.setShipStrategy(ShipStrategyType.FORWARD);
+				toJoin1.setLocalStrategy(LocalStrategy.SORT, new FieldList(3), new boolean[] { false });
+				
+				toJoin2.setShipStrategy(ShipStrategyType.FORWARD);
+				toJoin1.setLocalStrategy(LocalStrategy.NONE);
+				
+				toAfterJoin.setRequiredGlobalProps(rgp);
+				toAfterJoin.setRequiredLocalProps(null);
+				
+				FeedbackPropertiesMeetRequirementsReport report = afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
+				assertEquals(NOT_MET, report);
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	private static final DataSourceNode getSourceNode() {
+		return new DataSourceNode(new GenericDataSourceBase<String, TextInputFormat>(new TextInputFormat(new Path("/")), new OperatorInformation<String>(BasicTypeInfo.STRING_TYPE_INFO)));
+	}
+	
+	private static final MapNode getMapNode() {
+		return new MapNode(new MapOperatorBase<String, String, GenericMap<String,String>>(new IdentityMapper<String>(), new UnaryOperatorInformation<String, String>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), "map op"));
+	}
+	
+	private static final MatchNode getJoinNode() {
+		return new MatchNode(new JoinOperatorBase<String, String, String, GenericJoiner<String, String, String>>(new DummyJoinFunction<String>(), new BinaryOperatorInformation<String, String, String>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), new int[] {1}, new int[] {2}, "join op"));
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca2b287a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/IterationsCompilerTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/IterationsCompilerTest.java b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/IterationsCompilerTest.java
index a123099..05a863c 100644
--- a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/IterationsCompilerTest.java
+++ b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/IterationsCompilerTest.java
@@ -38,7 +38,6 @@ import eu.stratosphere.compiler.plan.BulkIterationPlanNode;
 import eu.stratosphere.compiler.plan.Channel;
 import eu.stratosphere.compiler.plan.OptimizedPlan;
 import eu.stratosphere.compiler.plan.WorksetIterationPlanNode;
-import eu.stratosphere.compiler.plandump.PlanJSONDumpGenerator;
 import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
 import eu.stratosphere.util.Collector;
 
@@ -164,8 +163,6 @@ public class IterationsCompilerTest extends CompilerTestBase {
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
 			
-			System.out.println(new PlanJSONDumpGenerator().getOptimizerPlanAsJSON(op));
-			
 			assertEquals(1, op.getDataSinks().size());
 			assertTrue(op.getDataSinks().iterator().next().getInput().getSource() instanceof BulkIterationPlanNode);
 			

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca2b287a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/testfunctions/DummyJoinFunction.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/testfunctions/DummyJoinFunction.java b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/testfunctions/DummyJoinFunction.java
new file mode 100644
index 0000000..aed8f26
--- /dev/null
+++ b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/testfunctions/DummyJoinFunction.java
@@ -0,0 +1,28 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.pact.compiler.testfunctions;
+
+import eu.stratosphere.api.java.functions.JoinFunction;
+
+public class DummyJoinFunction<T> extends JoinFunction<T, T, T> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public T join(T first, T second) {
+		return null;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca2b287a/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/graph/PageRankBasic.java
----------------------------------------------------------------------
diff --git a/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/graph/PageRankBasic.java b/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/graph/PageRankBasic.java
index 00764d8..3be5009 100644
--- a/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/graph/PageRankBasic.java
+++ b/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/graph/PageRankBasic.java
@@ -24,6 +24,7 @@ import eu.stratosphere.api.java.ExecutionEnvironment;
 import eu.stratosphere.api.java.IterativeDataSet;
 import eu.stratosphere.api.java.functions.FilterFunction;
 import eu.stratosphere.api.java.functions.FlatMapFunction;
+import eu.stratosphere.api.java.functions.FunctionAnnotation.ConstantFields;
 import eu.stratosphere.api.java.functions.GroupReduceFunction;
 import eu.stratosphere.api.java.functions.MapFunction;
 import eu.stratosphere.api.java.tuple.Tuple1;
@@ -94,7 +95,7 @@ public class PageRankBasic {
 		DataSet<Tuple2<Long, Double>> pagesWithRanks = pagesInput.
 				map(new RankAssigner((1.0d / numPages)));
 		
-		// build adjecency list from link input
+		// build adjacency list from link input
 		DataSet<Tuple2<Long, Long[]>> adjacencyListInput = 
 				linksInput.groupBy(0).reduceGroup(new BuildOutgoingEdgeList());
 		
@@ -150,8 +151,9 @@ public class PageRankBasic {
 	
 	/**
 	 * A reduce function that takes a sequence of edges and builds the adjacency list for the vertex where the edges
-	 * originate. Run as a preprocessing step.
+	 * originate. Run as a pre-processing step.
 	 */
+	@ConstantFields("0")
 	public static final class BuildOutgoingEdgeList extends GroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long[]>> {
 		
 		private final ArrayList<Long> neighbors = new ArrayList<Long>();
@@ -190,6 +192,7 @@ public class PageRankBasic {
 	/**
 	 * The function that applies the page rank dampening formula
 	 */
+	@ConstantFields("0")
 	public static final class Dampener extends MapFunction<Tuple2<Long,Double>, Tuple2<Long,Double>> {
 
 		private final double dampening;
@@ -275,5 +278,4 @@ public class PageRankBasic {
 			return PageRankData.getDefaultEdgeDataSet(env);
 		}
 	}
-	
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca2b287a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/cache/FileCache.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/cache/FileCache.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/cache/FileCache.java
index 882c3a9..e4f0a4b 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/cache/FileCache.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/cache/FileCache.java
@@ -14,6 +14,7 @@
 package eu.stratosphere.pact.runtime.cache;
 
 import eu.stratosphere.api.common.cache.DistributedCache;
+
 import java.io.File;
 import java.io.IOException;
 import java.util.HashMap;
@@ -64,7 +65,7 @@ public class FileCache {
 	 */
 	public FutureTask<Path> createTmpFile(String name, DistributedCacheEntry entry, JobID jobID) {
 		synchronized (count) {
-			Pair<JobID, String> key = new ImmutablePair(jobID, name);
+			Pair<JobID, String> key = new ImmutablePair<JobID, String>(jobID, name);
 			if (count.containsKey(key)) {
 				count.put(key, count.get(key) + 1);
 			} else {
@@ -84,7 +85,7 @@ public class FileCache {
 	 * @param jobID
 	 */
 	public void deleteTmpFile(String name, DistributedCacheEntry entry, JobID jobID) {
-		DeleteProcess dp = new DeleteProcess(name, entry, jobID, count.get(new ImmutablePair(jobID,name)));
+		DeleteProcess dp = new DeleteProcess(name, entry, jobID, count.get(new ImmutablePair<JobID, String>(jobID,name)));
 		executorService.schedule(dp, 5000L, TimeUnit.MILLISECONDS);
 	}
 
@@ -138,6 +139,7 @@ public class FileCache {
 	 */
 	private class CopyProcess implements Callable<Path> {
 		private JobID jobID;
+		@SuppressWarnings("unused")
 		private String name;
 		private String filePath;
 		private Boolean executable;
@@ -167,6 +169,7 @@ public class FileCache {
 	 */
 	private class DeleteProcess implements Runnable {
 		private String name;
+		@SuppressWarnings("unused")
 		private String filePath;
 		private JobID jobID;
 		private int oldCount;
@@ -180,7 +183,7 @@ public class FileCache {
 		@Override
 		public void run() {
 			synchronized (count) {
-				if (count.get(new ImmutablePair(jobID, name)) != oldCount) {
+				if (count.get(new ImmutablePair<JobID, String>(jobID, name)) != oldCount) {
 					return;
 				}
 			}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca2b287a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/ShipStrategyType.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/ShipStrategyType.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/ShipStrategyType.java
index a86c209..a8a1293 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/ShipStrategyType.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/ShipStrategyType.java
@@ -25,7 +25,7 @@ public enum ShipStrategyType {
 	NONE(false, false, false),
 	
 	/**
-	 * Forwarding the data locally in memory.
+	 * Forwarding the data preserving all global properties.
 	 */
 	FORWARD(false, false, false),
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca2b287a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/netty/NettyConnectionManagerTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/netty/NettyConnectionManagerTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/netty/NettyConnectionManagerTest.java
index c380431..d23b0f3 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/netty/NettyConnectionManagerTest.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/netty/NettyConnectionManagerTest.java
@@ -124,7 +124,7 @@ public class NettyConnectionManagerTest {
 	}
 
 
-	private class VerifyEnvelopes implements Answer {
+	private class VerifyEnvelopes implements Answer<Object> {
 
 		private final ConcurrentMap<ChannelID, Integer> received = new ConcurrentHashMap<ChannelID, Integer>();
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca2b287a/stratosphere-tests/src/test/java/eu/stratosphere/test/compiler/iterations/ConnectedComponentsTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/compiler/iterations/ConnectedComponentsTest.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/compiler/iterations/ConnectedComponentsTest.java
index 3f4d258..9c8c0c8 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/compiler/iterations/ConnectedComponentsTest.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/compiler/iterations/ConnectedComponentsTest.java
@@ -102,7 +102,7 @@ public class ConnectedComponentsTest extends CompilerTestBase {
 		Assert.assertEquals(DriverStrategy.NONE, vertexSource.getDriverStrategy());
 		Assert.assertEquals(DriverStrategy.NONE, edgesSource.getDriverStrategy());
 		
-//		Assert.assertEquals(DriverStrategy.HYBRIDHASH_BUILD_SECOND, neighborsJoin.getDriverStrategy());
+		Assert.assertEquals(DriverStrategy.HYBRIDHASH_BUILD_SECOND, neighborsJoin.getDriverStrategy());
 		Assert.assertEquals(set0, neighborsJoin.getKeysForInput1());
 		Assert.assertEquals(set0, neighborsJoin.getKeysForInput2());
 		
@@ -182,7 +182,7 @@ public class ConnectedComponentsTest extends CompilerTestBase {
 		Assert.assertEquals(DriverStrategy.NONE, vertexSource.getDriverStrategy());
 		Assert.assertEquals(DriverStrategy.NONE, edgesSource.getDriverStrategy());
 		
-//		Assert.assertEquals(DriverStrategy.HYBRIDHASH_BUILD_SECOND, neighborsJoin.getDriverStrategy());
+		Assert.assertEquals(DriverStrategy.HYBRIDHASH_BUILD_SECOND, neighborsJoin.getDriverStrategy());
 		Assert.assertEquals(set0, neighborsJoin.getKeysForInput1());
 		Assert.assertEquals(set0, neighborsJoin.getKeysForInput2());
 		

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca2b287a/stratosphere-tests/src/test/java/eu/stratosphere/test/compiler/iterations/PageRankCompilerTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/compiler/iterations/PageRankCompilerTest.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/compiler/iterations/PageRankCompilerTest.java
new file mode 100644
index 0000000..547a453
--- /dev/null
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/compiler/iterations/PageRankCompilerTest.java
@@ -0,0 +1,108 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.test.compiler.iterations;
+
+import static eu.stratosphere.api.java.aggregation.Aggregations.SUM;
+import static org.junit.Assert.fail;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import eu.stratosphere.api.common.Plan;
+import eu.stratosphere.api.java.DataSet;
+import eu.stratosphere.api.java.ExecutionEnvironment;
+import eu.stratosphere.api.java.IterativeDataSet;
+import eu.stratosphere.api.java.tuple.Tuple1;
+import eu.stratosphere.api.java.tuple.Tuple2;
+import eu.stratosphere.compiler.PactCompiler;
+import eu.stratosphere.compiler.plan.BulkIterationPlanNode;
+import eu.stratosphere.compiler.plan.BulkPartialSolutionPlanNode;
+import eu.stratosphere.compiler.plan.OptimizedPlan;
+import eu.stratosphere.compiler.plan.SinkPlanNode;
+import eu.stratosphere.configuration.Configuration;
+import eu.stratosphere.example.java.graph.PageRankBasic.BuildOutgoingEdgeList;
+import eu.stratosphere.example.java.graph.PageRankBasic.Dampener;
+import eu.stratosphere.example.java.graph.PageRankBasic.EpsilonFilter;
+import eu.stratosphere.example.java.graph.PageRankBasic.JoinVertexWithEdgesMatch;
+import eu.stratosphere.example.java.graph.PageRankBasic.RankAssigner;
+import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
+import eu.stratosphere.pact.runtime.task.util.LocalStrategy;
+import eu.stratosphere.test.compiler.util.CompilerTestBase;
+
+public class PageRankCompilerTest extends CompilerTestBase{
+	
+	@Test
+	public void testPageRank() {
+		try {
+			final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			// get input data
+			@SuppressWarnings("unchecked")
+			DataSet<Tuple1<Long>> pagesInput = env.fromElements(new Tuple1<Long>(1l));
+			@SuppressWarnings("unchecked")
+			DataSet<Tuple2<Long, Long>> linksInput =env.fromElements(new Tuple2<Long, Long>(1l, 2l));
+			
+			// assign initial rank to pages
+			DataSet<Tuple2<Long, Double>> pagesWithRanks = pagesInput.
+					map(new RankAssigner((1.0d / 10)));
+			
+			// build adjacency list from link input
+			DataSet<Tuple2<Long, Long[]>> adjacencyListInput = 
+					linksInput.groupBy(0).reduceGroup(new BuildOutgoingEdgeList());
+			
+			// set iterative data set
+			IterativeDataSet<Tuple2<Long, Double>> iteration = pagesWithRanks.iterate(10);
+			
+			Configuration cfg = new Configuration();
+			cfg.setString(PactCompiler.HINT_LOCAL_STRATEGY, PactCompiler.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND);
+			
+			DataSet<Tuple2<Long, Double>> newRanks = iteration
+					// join pages with outgoing edges and distribute rank
+					.join(adjacencyListInput).where(0).equalTo(0).withParameters(cfg)
+					.flatMap(new JoinVertexWithEdgesMatch())
+					// collect and sum ranks
+					.groupBy(0).aggregate(SUM, 1)
+					// apply dampening factor
+					.map(new Dampener(0.85, 10));
+			
+			DataSet<Tuple2<Long, Double>> finalPageRanks = iteration.closeWith(
+					newRanks, 
+					newRanks.join(iteration).where(0).equalTo(0)
+					// termination condition
+					.filter(new EpsilonFilter()));
+	
+			finalPageRanks.print();
+	
+			// get the plan and compile it
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			SinkPlanNode sinkPlanNode = (SinkPlanNode) op.getDataSinks().iterator().next();
+			BulkIterationPlanNode iterPlanNode = (BulkIterationPlanNode) sinkPlanNode.getInput().getSource();
+			
+			// check that the partitioning is pushed out of the first loop
+			Assert.assertEquals(ShipStrategyType.PARTITION_HASH, iterPlanNode.getInput().getShipStrategy());
+			Assert.assertEquals(LocalStrategy.NONE, iterPlanNode.getInput().getLocalStrategy());
+			
+			BulkPartialSolutionPlanNode partSolPlanNode = iterPlanNode.getPartialSolutionPlanNode();
+			Assert.assertEquals(ShipStrategyType.FORWARD, partSolPlanNode.getOutgoingChannels().get(0).getShipStrategy());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}


[3/6] git commit: Made FieldSet and FieldList immutable collections to prevent bugs to to side effect mutations

Posted by se...@apache.org.
Made FieldSet and FieldList immutable collections to prevent bugs to to side effect mutations


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/202aa4a7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/202aa4a7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/202aa4a7

Branch: refs/heads/master
Commit: 202aa4a7a20d0029096a341ae4ba05e16aaab83b
Parents: 31a3739
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Jun 16 07:47:54 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jun 17 02:07:37 2014 +0200

----------------------------------------------------------------------
 .../pact/compiler/GroupOrderTest.java           |   3 +-
 .../operators/DualInputSemanticProperties.java  |  60 +++----
 .../api/common/operators/Ordering.java          |  13 +-
 .../common/operators/SemanticProperties.java    |   5 +-
 .../SingleInputSemanticProperties.java          |  30 ++--
 .../api/common/operators/util/FieldList.java    |  99 +++++++++--
 .../api/common/operators/util/FieldSet.java     | 176 ++++++++++++++-----
 .../common/operators/util/FieldListTest.java    | 105 +++++++++++
 .../api/common/operators/util/FieldSetTest.java | 110 ++++++++++++
 .../api/java/functions/SemanticPropUtil.java    |   4 +-
 .../stratosphere/api/scala/CompilerHints.scala  |  17 +-
 11 files changed, 495 insertions(+), 127 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/202aa4a7/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/GroupOrderTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/GroupOrderTest.java b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/GroupOrderTest.java
index da72c94..e170203 100644
--- a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/GroupOrderTest.java
+++ b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/GroupOrderTest.java
@@ -87,8 +87,7 @@ public class GroupOrderTest extends CompilerTestBase  {
 		Assert.assertEquals(LocalStrategy.SORT, c.getLocalStrategy());
 		
 		FieldList ship = new FieldList(2);
-		FieldList local = new FieldList(2);
-		local.add(5);
+		FieldList local = new FieldList(2, 5);
 		Assert.assertEquals(ship, c.getShipStrategyKeys());
 		Assert.assertEquals(local, c.getLocalStrategyKeys());
 		Assert.assertTrue(c.getLocalStrategySortOrder()[0] == reducer.getSortOrders()[0]);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/202aa4a7/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/DualInputSemanticProperties.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/DualInputSemanticProperties.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/DualInputSemanticProperties.java
index c6b22f2..1ed399a 100644
--- a/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/DualInputSemanticProperties.java
+++ b/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/DualInputSemanticProperties.java
@@ -63,13 +63,13 @@ public class DualInputSemanticProperties extends SemanticProperties {
 	 * @param destinationField the position in the destination record(s)
 	 */
 	public void addForwardedField1(int sourceField, int destinationField) {
-		FieldSet fs;
-		if((fs = this.forwardedFields1.get(sourceField)) != null) {
-			fs.add(destinationField);
-		} else {
-			fs = new FieldSet(destinationField);
-			this.forwardedFields1.put(sourceField, fs);
+		FieldSet old = this.forwardedFields1.get(sourceField);
+		if (old == null) {
+			old = FieldSet.EMPTY_SET;
 		}
+		
+		FieldSet fs = old.addField(destinationField);
+		this.forwardedFields1.put(sourceField, fs);
 	}
 	
 	/**
@@ -81,13 +81,13 @@ public class DualInputSemanticProperties extends SemanticProperties {
 	 * @param destinationFields the position in the destination record(s)
 	 */
 	public void addForwardedField1(int sourceField, FieldSet destinationFields) {
-		FieldSet fs;
-		if((fs = this.forwardedFields1.get(sourceField)) != null) {
-			fs.addAll(destinationFields);
-		} else {
-			fs = new FieldSet(destinationFields);
-			this.forwardedFields1.put(sourceField, fs);
+		FieldSet old = this.forwardedFields1.get(sourceField);
+		if (old == null) {
+			old = FieldSet.EMPTY_SET;
 		}
+		
+		FieldSet fs = old.addFields(destinationFields);
+		this.forwardedFields1.put(sourceField, fs);
 	}
 	
 	/**
@@ -122,13 +122,13 @@ public class DualInputSemanticProperties extends SemanticProperties {
 	 * @param destinationField the position in the destination record(s)
 	 */
 	public void addForwardedField2(int sourceField, int destinationField) {
-		FieldSet fs;
-		if((fs = this.forwardedFields2.get(sourceField)) != null) {
-			fs.add(destinationField);
-		} else {
-			fs = new FieldSet(destinationField);
-			this.forwardedFields2.put(sourceField, fs);
+		FieldSet old = this.forwardedFields2.get(sourceField);
+		if (old == null) {
+			old = FieldSet.EMPTY_SET;
 		}
+		
+		FieldSet fs = old.addField(destinationField);
+		this.forwardedFields2.put(sourceField, fs);
 	}
 	
 	/**
@@ -140,13 +140,13 @@ public class DualInputSemanticProperties extends SemanticProperties {
 	 * @param destinationFields the position in the destination record(s)
 	 */
 	public void addForwardedField2(int sourceField, FieldSet destinationFields) {
-		FieldSet fs;
-		if((fs = this.forwardedFields2.get(sourceField)) != null) {
-			fs.addAll(destinationFields);
-		} else {
-			fs = new FieldSet(destinationFields);
-			this.forwardedFields2.put(sourceField, fs);
+		FieldSet old = this.forwardedFields2.get(sourceField);
+		if (old == null) {
+			old = FieldSet.EMPTY_SET;
 		}
+		
+		FieldSet fs = old.addFields(destinationFields);
+		this.forwardedFields2.put(sourceField, fs);
 	}
 	
 	/**
@@ -179,10 +179,10 @@ public class DualInputSemanticProperties extends SemanticProperties {
 	 * @param readFields the position(s) in the source record(s)
 	 */
 	public void addReadFields1(FieldSet readFields) {
-		if(this.readFields1 == null) {
-			this.readFields1 = new FieldSet(readFields);
+		if (this.readFields1 == null) {
+			this.readFields1 = readFields;
 		} else {
-			this.readFields1.addAll(readFields);
+			this.readFields1 = this.readFields2.addFields(readFields);
 		}
 	}
 	
@@ -213,10 +213,10 @@ public class DualInputSemanticProperties extends SemanticProperties {
 	 * @param readFields the position(s) in the source record(s)
 	 */
 	public void addReadFields2(FieldSet readFields) {
-		if(this.readFields2 == null) {
-			this.readFields2 = new FieldSet(readFields);
+		if (this.readFields2 == null) {
+			this.readFields2 = readFields;
 		} else {
-			this.readFields2.addAll(readFields);
+			this.readFields2 = this.readFields2.addFields(readFields);
 		}
 	}
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/202aa4a7/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/Ordering.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/Ordering.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/Ordering.java
index f63bdb9..a9895ec 100644
--- a/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/Ordering.java
+++ b/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/Ordering.java
@@ -24,7 +24,7 @@ import eu.stratosphere.types.Key;
  */
 public class Ordering {
 	
-	protected final FieldList indexes = new FieldList();
+	protected FieldList indexes = new FieldList();
 	
 	protected final ArrayList<Class<? extends Key<?>>> types = new ArrayList<Class<? extends Key<?>>>();
 	
@@ -33,7 +33,7 @@ public class Ordering {
 	// --------------------------------------------------------------------------------------------
 	
 	/**
-	 * 
+	 * Creates an empty ordering.
 	 */
 	public Ordering() {}
 	
@@ -66,7 +66,7 @@ public class Ordering {
 			throw new IllegalArgumentException("An ordering must not be created with a NONE order.");
 		}
 		
-		this.indexes.add(index);
+		this.indexes = this.indexes.addField(index);
 		this.types.add(type);
 		this.orders.add(order);
 		return this;
@@ -210,10 +210,9 @@ public class Ordering {
 	
 	
 
-	public Ordering clone()
-	{
-		final Ordering newOrdering = new Ordering();
-		newOrdering.indexes.addAll(this.indexes);
+	public Ordering clone() {
+		Ordering newOrdering = new Ordering();
+		newOrdering.indexes = this.indexes;
 		newOrdering.types.addAll(this.types);
 		newOrdering.orders.addAll(this.orders);
 		return newOrdering;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/202aa4a7/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/SemanticProperties.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/SemanticProperties.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/SemanticProperties.java
index 8c375fc..50af69f 100644
--- a/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/SemanticProperties.java
+++ b/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/SemanticProperties.java
@@ -21,6 +21,7 @@ import eu.stratosphere.api.common.operators.util.FieldSet;
  * Container for the semantic properties associated to an operator.
  */
 public abstract class SemanticProperties implements Serializable {
+	
 	private static final long serialVersionUID = 1L;
 
 	/**
@@ -36,9 +37,9 @@ public abstract class SemanticProperties implements Serializable {
 	 */
 	public void addWrittenFields(FieldSet writtenFields) {
 		if(this.writtenFields == null) {
-			this.writtenFields = new FieldSet(writtenFields);
+			this.writtenFields = writtenFields;
 		} else {
-			this.writtenFields.addAll(writtenFields);
+			this.writtenFields = this.writtenFields.addFields(writtenFields);
 		}
 	}
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/202aa4a7/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/SingleInputSemanticProperties.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/SingleInputSemanticProperties.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/SingleInputSemanticProperties.java
index 1a6841d..f1d098a 100644
--- a/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/SingleInputSemanticProperties.java
+++ b/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/SingleInputSemanticProperties.java
@@ -49,13 +49,13 @@ public class SingleInputSemanticProperties extends SemanticProperties {
 	 * @param destinationField the position in the destination record(s)
 	 */
 	public void addForwardedField(int sourceField, int destinationField) {
-		FieldSet fs;
-		if((fs = this.forwardedFields.get(sourceField)) != null) {
-			fs.add(destinationField);
-		} else {
-			fs = new FieldSet(destinationField);
-			this.forwardedFields.put(sourceField, fs);
+		FieldSet old = this.forwardedFields.get(sourceField);
+		if (old == null) {
+			old = FieldSet.EMPTY_SET;
 		}
+		
+		FieldSet fs = old.addField(destinationField);
+		this.forwardedFields.put(sourceField, fs);
 	}
 	
 	/**
@@ -67,13 +67,13 @@ public class SingleInputSemanticProperties extends SemanticProperties {
 	 * @param destinationFields the position in the destination record(s)
 	 */
 	public void addForwardedField(int sourceField, FieldSet destinationFields) {
-		FieldSet fs;
-		if((fs = this.forwardedFields.get(sourceField)) != null) {
-			fs.addAll(destinationFields);
-		} else {
-			fs = new FieldSet(destinationFields);
-			this.forwardedFields.put(sourceField, fs);
+		FieldSet old = this.forwardedFields.get(sourceField);
+		if (old == null) {
+			old = FieldSet.EMPTY_SET;
 		}
+		
+		FieldSet fs = old.addFields(destinationFields);
+		this.forwardedFields.put(sourceField, fs);
 	}
 	
 	/**
@@ -105,10 +105,10 @@ public class SingleInputSemanticProperties extends SemanticProperties {
 	 * @param readFields the position(s) in the source record(s)
 	 */
 	public void addReadFields(FieldSet readFields) {
-		if(this.readFields == null) {
-			this.readFields = new FieldSet(readFields);
+		if (this.readFields == null) {
+			this.readFields = readFields;
 		} else {
-			this.readFields.addAll(readFields);
+			this.readFields = this.readFields.addFields(readFields);
 		}
 	}
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/202aa4a7/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/util/FieldList.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/util/FieldList.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/util/FieldList.java
index 96b0f34..5b7b8da 100644
--- a/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/util/FieldList.java
+++ b/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/util/FieldList.java
@@ -14,42 +14,106 @@
 package eu.stratosphere.api.common.operators.util;
 
 import java.util.ArrayList;
-import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 
+import org.apache.commons.lang3.Validate;
 
 /**
- *
+ * Immutable ordered list of fields IDs.
  */
 public class FieldList extends FieldSet {
 	
+	public static final FieldList EMPTY_LIST = new FieldList();
+	
+	
 	public FieldList() {
-		super();
+		super(Collections.<Integer>emptyList());
+	}
+	
+	public FieldList(int fieldId) {
+		super(Collections.singletonList(fieldId));
 	}
 	
-	public FieldList(int columnIndex) {
-		this();
-		add(columnIndex);
+	public FieldList(Integer fieldId) {
+		super(Collections.singletonList(Validate.notNull(fieldId, "The fields ID must not be null.")));
 	}
 	
 	public FieldList(int... columnIndexes) {
-		this();
-		addAll(columnIndexes);
+		super (fromInts(columnIndexes));
+	}
+	
+	private FieldList(List<Integer> fields) {
+		super(fields);
 	}
 	
 	// --------------------------------------------------------------------------------------------
 	
+	@Override
+	public FieldList addField(Integer fieldID) {
+		if (fieldID == null) {
+			throw new IllegalArgumentException("Field ID must not be null.");
+		}
+		
+		if (size() == 0) {
+			return new FieldList(fieldID);
+		} else {
+			ArrayList<Integer> list = new ArrayList<Integer>(size() + 1);
+			list.addAll(this.collection);
+			list.add(fieldID);
+			return new FieldList(Collections.unmodifiableList(list));
+		}
+	}
+
+	@Override
+	public FieldList addFields(int... fieldIDs) {
+		if (fieldIDs == null || fieldIDs.length == 0) {
+			return this;
+		}
+		if (size() == 0) {
+			return new FieldList(fieldIDs);
+		} else {
+			ArrayList<Integer> list = new ArrayList<Integer>(size() + fieldIDs.length);
+			list.addAll(this.collection);
+			for (int i = 0; i < fieldIDs.length; i++) {
+				list.add(fieldIDs[i]);
+			}
+			
+			return new FieldList(Collections.unmodifiableList(list));
+		}
+	}
+	
+	@Override
+	public FieldList addFields(FieldSet set) {
+		if (set == null) {
+			throw new IllegalArgumentException("FieldSet to add must not be null.");
+		}
+		
+		if (set.size() == 0) {
+			return this;
+		}
+		else if (size() == 0 && set instanceof FieldList) {
+			return (FieldList) set;
+		}
+		else {
+			ArrayList<Integer> list = new ArrayList<Integer>(size() + set.size());
+			list.addAll(this.collection);
+			list.addAll(set.collection);
+			return new FieldList(Collections.unmodifiableList(list));
+		}
+	}
+	
 	public Integer get(int pos) {
 		return get().get(pos);
 	}
 	
+	@Override
 	public FieldList toFieldList() {
 		return this;
 	}
 	
 	// --------------------------------------------------------------------------------------------
 	
-
 	@Override
 	public boolean isValidSubset(FieldSet set) {
 		if (set instanceof FieldList) {
@@ -90,11 +154,6 @@ public class FieldList extends FieldSet {
 	}
 	
 	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	protected Collection<Integer> initCollection() {
-		return new ArrayList<Integer>();
-	}
 
 	@Override
 	protected String getDescriptionPrefix() {
@@ -109,4 +168,16 @@ public class FieldList extends FieldSet {
 	private List<Integer> get() {
 		return (List<Integer>) this.collection;
 	}
+	
+	private static final List<Integer> fromInts(int... ints) {
+		if (ints == null || ints.length == 0) {
+			return Collections.emptyList();
+		} else {
+			ArrayList<Integer> al = new ArrayList<Integer>(ints.length);
+			for (int i = 0; i < ints.length; i++) {
+				al.add(ints[i]);
+			}
+			return Collections.unmodifiableList(al);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/202aa4a7/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/util/FieldSet.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/util/FieldSet.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/util/FieldSet.java
index 3738319..a1d1b03 100644
--- a/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/util/FieldSet.java
+++ b/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/util/FieldSet.java
@@ -15,67 +15,153 @@ package eu.stratosphere.api.common.operators.util;
 
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 
 /**
- *
+ * Immutable unordered collection of fields IDs.
  */
 public class FieldSet implements Iterable<Integer> {
 	
+	public static final FieldSet EMPTY_SET = new FieldSet();
+	
 	protected final Collection<Integer> collection;
 
 	// --------------------------------------------------------------------------------------------
 
+	/**
+	 * Creates a new empty set of fields.
+	 */
 	public FieldSet() {
-		this.collection = initCollection();
+		this.collection = Collections.emptySet();
 	}
 	
-	public FieldSet(int columnIndex) {
-		this();
-		add(columnIndex);
+	/**
+	 * Creates a set with one field.
+	 * 
+	 * @param fieldID The id of the field.
+	 */
+	public FieldSet(Integer fieldID) {
+		if (fieldID == null) {
+			throw new IllegalArgumentException("Field ID must not be null.");
+		}
+		
+		this.collection = Collections.singleton(fieldID);
 	}
 	
-	public FieldSet(int[] columnIndexes) {
-		this();
-		addAll(columnIndexes);
+	/**
+	 * Creates a set with the given fields.
+	 * 
+	 * @param fieldIDs The IDs of the fields.
+	 */
+	public FieldSet(int... fieldIDs) {
+		if (fieldIDs == null || fieldIDs.length == 0) {
+			this.collection = Collections.emptySet();
+		} else {
+			HashSet<Integer> set = new HashSet<Integer>(2 * fieldIDs.length);
+			for (int i = 0; i < fieldIDs.length; i++) {
+				set.add(fieldIDs[i]);
+			}
+			
+			this.collection = Collections.unmodifiableSet(set);
+		}
 	}
 	
-	public FieldSet(Collection<Integer> o) {
-		this();
-		addAll(o);
+	/**
+	 * Creates a set with the given fields.
+	 * 
+	 * @param fieldIDs The IDs of the fields.
+	 */
+	public FieldSet(int[] fieldIDs, boolean marker) {
+		if (fieldIDs == null || fieldIDs.length == 0) {
+			this.collection = Collections.emptySet();
+		} else {
+			HashSet<Integer> set = new HashSet<Integer>(2 * fieldIDs.length);
+			for (int i = 0; i < fieldIDs.length; i++) {
+				set.add(fieldIDs[i]);
+			}
+			
+			this.collection = Collections.unmodifiableSet(set);
+		}
 	}
 	
-	public FieldSet(Collection<Integer> o1, Collection<Integer> o2) {
-		this();
-		addAll(o1);
-		addAll(o2);
+	protected FieldSet(Collection<Integer> fields) {
+		this.collection = fields;
 	}
 	
-	public FieldSet(FieldSet toCopy) {
-		this();
-		addAll(toCopy);
+	/**
+	 * @param fieldSet The first part of the new set, NOT NULL!
+	 * @param fieldID The ID to be added, NOT NULL!
+	 */
+	private FieldSet(FieldSet fieldSet, Integer fieldID) {
+		if (fieldSet.size() == 0) {
+			this.collection = Collections.singleton(fieldID);
+		}
+		else {
+			HashSet<Integer> set = new HashSet<Integer>(2 * (fieldSet.collection.size() + 1));
+			set.addAll(fieldSet.collection);
+			set.add(fieldID);
+			this.collection = Collections.unmodifiableSet(set);
+		}
 	}
 	
-	// --------------------------------------------------------------------------------------------
+	private FieldSet(FieldSet fieldSet, int... fieldIDs) {
+		if (fieldIDs == null || fieldIDs.length == 0) {
+			this.collection = fieldSet.collection;
+		} else {
+			HashSet<Integer> set = new HashSet<Integer>(2 * (fieldSet.collection.size() + fieldIDs.length));
+			set.addAll(fieldSet.collection);
+			
+			for (int i = 0; i < fieldIDs.length; i++) {
+				set.add(fieldIDs[i]);
+			}
+			
+			this.collection = Collections.unmodifiableSet(set);
+		}
+	}
 	
-	public void add(Integer columnIndex) {
-		this.collection.add(columnIndex);
+	private FieldSet(FieldSet fieldSet1, FieldSet fieldSet2) {
+		if (fieldSet2.size() == 0) {
+			this.collection = fieldSet1.collection;
+		}
+		else if (fieldSet1.size() == 0) {
+			this.collection = fieldSet2.collection;
+		}
+		else {
+			HashSet<Integer> set = new HashSet<Integer>(2 * (fieldSet1.size() + fieldSet2.size()));
+			set.addAll(fieldSet1.collection);
+			set.addAll(fieldSet2.collection);
+			this.collection = Collections.unmodifiableSet(set);
+		}
 	}
-
-	public void addAll(int[] columnIndexes) {
-		for (int i = 0; i < columnIndexes.length; i++) {
-			add(columnIndexes[i]);
+	
+	// --------------------------------------------------------------------------------------------
+	
+	public FieldSet addField(Integer fieldID) {
+		if (fieldID == null) {
+			throw new IllegalArgumentException("Field ID must not be null.");
 		}
+		return new FieldSet(this, fieldID);
 	}
 
-	public void addAll(Collection<Integer> columnIndexes) {
-		this.collection.addAll(columnIndexes);
+	public FieldSet addFields(int... fieldIDs) {
+		return new FieldSet(this, fieldIDs);
 	}
 	
-	public void addAll(FieldSet set) {
-		for (Integer i : set) {
-			add(i);
+	public FieldSet addFields(FieldSet set) {
+		if (set == null) {
+			throw new IllegalArgumentException("FieldSet to add must not be null.");
+		}
+		
+		if (set.size() == 0) {
+			return this;
+		}
+		else if (this.size() == 0) {
+			return set;
+		}
+		else {
+			return new FieldSet(this, set);
 		}
 	}
 	
@@ -87,18 +173,28 @@ public class FieldSet implements Iterable<Integer> {
 		return this.collection.size();
 	}
 	
-
 	@Override
 	public Iterator<Integer> iterator() {
 		return this.collection.iterator();
 	}
 	
+	/**
+	 * Turns the FieldSet into an ordered FieldList.
+	 * 
+	 * @return An ordered FieldList.
+	 */
 	public FieldList toFieldList() {
 		int[] pos = toArray();
 		Arrays.sort(pos);
 		return new FieldList(pos);
 	}
 	
+	/**
+	 * Transforms the field set into an array of field IDs. Whether the IDs are ordered
+	 * or unordered depends on the specific subclass of the field set.
+	 * 
+	 * @return An array of all contained field IDs.
+	 */
 	public int[] toArray() {
 		int[] a = new int[this.collection.size()];
 		int i = 0;
@@ -134,13 +230,11 @@ public class FieldSet implements Iterable<Integer> {
 	
 	// --------------------------------------------------------------------------------------------
 	
-
 	@Override
 	public int hashCode() {
 		return this.collection.hashCode();
 	}
 
-
 	@Override
 	public boolean equals(Object obj) {
 		if (obj == null) {
@@ -171,18 +265,18 @@ public class FieldSet implements Iterable<Integer> {
 	}
 	
 
+	/**
+	 * Since instances of FieldSet are strictly immutable, this method does not actually clone,
+	 * but it only returns the original instance.
+	 * 
+	 * @return This objects reference, unmodified.
+	 */
 	public FieldSet clone() {
-		FieldSet set = new FieldSet();
-		set.addAll(this.collection);
-		return set;
+		return this;
 	}
 	
 	// --------------------------------------------------------------------------------------------
 	
-	protected Collection<Integer> initCollection() {
-		return new HashSet<Integer>();
-	}
-	
 	protected String getDescriptionPrefix() {
 		return "(";
 	}
@@ -190,6 +284,4 @@ public class FieldSet implements Iterable<Integer> {
 	protected String getDescriptionSuffix() {
 		return ")";
 	}
-	
-	public static final FieldSet EMPTY_SET = new FieldSet();
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/202aa4a7/stratosphere-core/src/test/java/eu/stratosphere/api/common/operators/util/FieldListTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/test/java/eu/stratosphere/api/common/operators/util/FieldListTest.java b/stratosphere-core/src/test/java/eu/stratosphere/api/common/operators/util/FieldListTest.java
new file mode 100644
index 0000000..fe772c6
--- /dev/null
+++ b/stratosphere-core/src/test/java/eu/stratosphere/api/common/operators/util/FieldListTest.java
@@ -0,0 +1,105 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.api.common.operators.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+
+import java.util.Arrays;
+import java.util.Iterator;
+
+import org.junit.Test;
+
+public class FieldListTest {
+	
+	@Test
+	public void testFieldListConstructors() {
+		check(new FieldList());
+		check(FieldList.EMPTY_LIST);
+		check(new FieldList(14), 14);
+		check(new FieldList(new Integer(3)), 3);
+		check(new FieldList(7, 4, 1), 7, 4, 1);
+		check(new FieldList(7, 4, 1, 4, 7, 1, 4, 2), 7, 4, 1, 4, 7, 1, 4, 2);
+	}
+	
+	@Test
+	public void testFieldListAdds() {
+		check(new FieldList().addField(1).addField(2), 1, 2);
+		check(FieldList.EMPTY_LIST.addField(3).addField(2), 3, 2);
+		check(new FieldList(13).addFields(new FieldList(17, 31, 42)), 13, 17, 31, 42);
+		check(new FieldList(14).addFields(new FieldList(17)), 14, 17);
+		check(new FieldList(3).addFields(2, 8, 5, 7), 3, 2, 8, 5, 7);
+	}
+	
+	@Test
+	public void testImmutability() {
+		FieldList s1 = new FieldList();
+		FieldList s2 = new FieldList(5);
+		FieldList s3 = new FieldList(new Integer(7));
+		FieldList s4 = new FieldList(5, 4, 7, 6);
+		
+		s1.addFields(s2).addFields(s3);
+		s2.addFields(s4);
+		s4.addFields(s1);
+		
+		s1.addField(new Integer(14));
+		s2.addFields(78, 13, 66, 3);
+		
+		assertEquals(0, s1.size());
+		assertEquals(1, s2.size());
+		assertEquals(1, s3.size());
+		assertEquals(4, s4.size());
+	}
+	
+	@Test
+	public void testAddSetToList() {
+		check(new FieldList().addFields(new FieldSet(1)).addFields(2), 1, 2);
+		check(new FieldList().addFields(1).addFields(new FieldSet(2)), 1, 2);
+		check(new FieldList().addFields(new FieldSet(2)), 2);
+	}
+	
+	private static void check(FieldList set, int... elements) {
+		if (elements == null) {
+			assertEquals(0, set.size());
+			return;
+		}
+		
+		assertEquals(elements.length, set.size());
+		
+		// test contains
+		for (int i : elements) {
+			set.contains(i);
+		}
+		
+		// test to array
+		{
+			int[] arr = set.toArray();
+			assertTrue(Arrays.equals(arr, elements));
+		}
+		
+		{
+			int[] fromIter = new int[set.size()];
+			Iterator<Integer> iter = set.iterator();
+			
+			for (int i = 0; i < fromIter.length; i++) {
+				fromIter[i] = iter.next();
+			}
+			assertFalse(iter.hasNext());
+			assertTrue(Arrays.equals(fromIter, elements));
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/202aa4a7/stratosphere-core/src/test/java/eu/stratosphere/api/common/operators/util/FieldSetTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/test/java/eu/stratosphere/api/common/operators/util/FieldSetTest.java b/stratosphere-core/src/test/java/eu/stratosphere/api/common/operators/util/FieldSetTest.java
new file mode 100644
index 0000000..1a147fc
--- /dev/null
+++ b/stratosphere-core/src/test/java/eu/stratosphere/api/common/operators/util/FieldSetTest.java
@@ -0,0 +1,110 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.api.common.operators.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+
+import java.util.Arrays;
+import java.util.Iterator;
+
+import org.junit.Test;
+
+public class FieldSetTest {
+	
+	@Test
+	public void testFieldSetConstructors() {
+		check(new FieldSet());
+		check(FieldSet.EMPTY_SET);
+		check(new FieldSet(14), 14);
+		check(new FieldSet(new Integer(3)), 3);
+		check(new FieldSet(7, 4, 1), 1, 4, 7);
+		check(new FieldSet(7, 4, 1, 4, 7, 1, 4, 2), 1, 4, 2, 7);
+	}
+	
+	@Test
+	public void testFieldSetAdds() {
+		check(new FieldSet().addField(1).addField(2), 1, 2);
+		check(FieldSet.EMPTY_SET.addField(3).addField(2), 3, 2);
+		check(new FieldSet(13).addFields(new FieldSet(17, 31, 42)), 17, 13, 42, 31);
+		check(new FieldSet(14).addFields(new FieldSet(17)), 17, 14);
+		check(new FieldSet(3).addFields(2, 8, 5, 7), 3, 2, 8, 5, 7);
+		check(new FieldSet().addFields(new FieldSet()));
+		check(new FieldSet().addFields(new FieldSet(3, 4)), 4, 3);
+		check(new FieldSet(5, 1).addFields(new FieldSet()), 5, 1);
+	}
+	
+	@Test
+	public void testImmutability() {
+		FieldSet s1 = new FieldSet();
+		FieldSet s2 = new FieldSet(5);
+		FieldSet s3 = new FieldSet(new Integer(7));
+		FieldSet s4 = new FieldSet(5, 4, 7, 6);
+		
+		s1.addFields(s2).addFields(s3);
+		s2.addFields(s4);
+		s4.addFields(s1);
+		
+		s1.addField(new Integer(14));
+		s2.addFields(78, 13, 66, 3);
+		
+		assertEquals(0, s1.size());
+		assertEquals(1, s2.size());
+		assertEquals(1, s3.size());
+		assertEquals(4, s4.size());
+	}
+	
+	@Test
+	public void testAddListToSet() {
+		check(new FieldSet().addField(1).addFields(new FieldList(14, 3, 1)), 1, 3, 14);
+	}
+	
+	private static void check(FieldSet set, int... elements) {
+		if (elements == null) {
+			assertEquals(0, set.size());
+			return;
+		}
+		
+		assertEquals(elements.length, set.size());
+		
+		// test contains
+		for (int i : elements) {
+			set.contains(i);
+		}
+		
+		Arrays.sort(elements);
+		
+		// test to array
+		{
+			int[] arr = set.toArray();
+			Arrays.sort(arr);
+			assertTrue(Arrays.equals(arr, elements));
+		}
+		
+		{
+			int[] fromIter = new int[set.size()];
+			Iterator<Integer> iter = set.iterator();
+			
+			for (int i = 0; i < fromIter.length; i++) {
+				fromIter[i] = iter.next();
+			}
+			assertFalse(iter.hasNext());
+			Arrays.sort(fromIter);
+			assertTrue(Arrays.equals(fromIter, elements));
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/202aa4a7/stratosphere-java/src/main/java/eu/stratosphere/api/java/functions/SemanticPropUtil.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/functions/SemanticPropUtil.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/functions/SemanticPropUtil.java
index 74b1871..4d767b0 100644
--- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/functions/SemanticPropUtil.java
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/functions/SemanticPropUtil.java
@@ -294,14 +294,14 @@ public class SemanticPropUtil {
 		}
 
 		matcher = PATTERN_DIGIT.matcher(s);
-		FieldSet fs = new FieldSet();
+		FieldSet fs = FieldSet.EMPTY_SET;
 
 		while (matcher.find()) {
 			int field = Integer.valueOf(matcher.group());
 			if (!isValidField(outType, field) || !isValidField(inType, field)) {
 				throw new IndexOutOfBoundsException("Annotation: Field " + field + " not available in the output tuple.");
 			}
-			fs.add(field);
+			fs = fs.addField(field);
 		}
 		return fs;
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/202aa4a7/stratosphere-scala/src/main/scala/eu/stratosphere/api/scala/CompilerHints.scala
----------------------------------------------------------------------
diff --git a/stratosphere-scala/src/main/scala/eu/stratosphere/api/scala/CompilerHints.scala b/stratosphere-scala/src/main/scala/eu/stratosphere/api/scala/CompilerHints.scala
index b9f7230..af6f2d6 100644
--- a/stratosphere-scala/src/main/scala/eu/stratosphere/api/scala/CompilerHints.scala
+++ b/stratosphere-scala/src/main/scala/eu/stratosphere/api/scala/CompilerHints.scala
@@ -25,27 +25,18 @@ import eu.stratosphere.types.Record
 
 case class KeyCardinality(key: FieldSelector, isUnique: Boolean, distinctCount: Option[Long], avgNumRecords: Option[Float]) {
 
-  private class RefreshableFieldSet extends PactFieldSet {
-    def refresh(indexes: Set[Int]) = {
-      this.collection.clear()
-      for (index <- indexes)
-        this.add(index)
-    }
-  }
-
-  @transient private var pactFieldSets = collection.mutable.Map[Operator[Record] with ScalaOperator[_, _], RefreshableFieldSet]()
+  @transient private var pactFieldSets = collection.mutable.Map[Operator[Record] with ScalaOperator[_, _], PactFieldSet]()
 
   def getPactFieldSet(contract: Operator[Record] with ScalaOperator[_, _]): PactFieldSet = {
 
     if (pactFieldSets == null)
-      pactFieldSets = collection.mutable.Map[Operator[Record] with ScalaOperator[_, _], RefreshableFieldSet]()
+      pactFieldSets = collection.mutable.Map[Operator[Record] with ScalaOperator[_, _], PactFieldSet]()
 
     val keyCopy = key.copy
     contract.getUDF.attachOutputsToInputs(keyCopy.inputFields)
-    val keySet = keyCopy.selectedFields.toIndexSet
+    val keySet = keyCopy.selectedFields.toIndexSet.toArray
 
-    val fieldSet = pactFieldSets.getOrElseUpdate(contract, new RefreshableFieldSet())
-    fieldSet.refresh(keySet)
+    val fieldSet = pactFieldSets.getOrElseUpdate(contract, new PactFieldSet(keySet, true))
     fieldSet
   }
 }


[6/6] git commit: [FLINK-935] Fix compiler logic that pushed work out of the iteration loop.

Posted by se...@apache.org.
[FLINK-935] Fix compiler logic that pushed work out of the iteration loop.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/31a37393
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/31a37393
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/31a37393

Branch: refs/heads/master
Commit: 31a373930bce27a0d4abc2cbcff4284d5af9002a
Parents: 229754d
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Jun 13 19:56:26 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jun 17 02:07:37 2014 +0200

----------------------------------------------------------------------
 .../compiler/costs/CostEstimator.java           |   2 -
 .../compiler/dag/BulkIterationNode.java         |  12 +-
 .../compiler/dag/BulkPartialSolutionNode.java   |   2 +-
 .../stratosphere/compiler/dag/DataSinkNode.java |   8 +-
 .../compiler/dag/SingleInputNode.java           |  11 +-
 .../stratosphere/compiler/dag/TwoInputNode.java |  16 +-
 .../compiler/dag/WorksetIterationNode.java      |   2 +-
 .../RequestedGlobalProperties.java              |  46 +---
 .../RequestedLocalProperties.java               |  14 +-
 .../eu/stratosphere/compiler/plan/Channel.java  |  76 +++---
 .../plandump/PlanJSONDumpGenerator.java         |   3 -
 .../plantranslate/NepheleJobGraphGenerator.java |   1 -
 .../pact/compiler/DOPChangeTest.java            |   4 +-
 .../pact/compiler/IterationsCompilerTest.java   | 265 +++++++++++++++++++
 .../MultipleIterationsCompilerTest.java         | 221 ----------------
 .../WorksetEmptyConvergenceCriterion.java       |   2 +
 .../pact/runtime/shipping/OutputEmitter.java    |   2 -
 .../runtime/shipping/RecordOutputEmitter.java   |   2 -
 .../pact/runtime/shipping/ShipStrategyType.java |   6 -
 .../test/util/testjar/KMeansForTest.java        |   2 +-
 20 files changed, 345 insertions(+), 352 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/CostEstimator.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/CostEstimator.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/CostEstimator.java
index 492204b..11fb45b 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/CostEstimator.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/CostEstimator.java
@@ -94,8 +94,6 @@ public abstract class CostEstimator {
 			case FORWARD:
 //				costs.addHeuristicNetworkCost(channel.getMaxDepth());
 				break;
-			case PARTITION_LOCAL_HASH:
-				break;
 			case PARTITION_RANDOM:
 				addRandomPartitioningCost(channel, costs);
 				break;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkIterationNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkIterationNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkIterationNode.java
index c547da9..b60f427 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkIterationNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkIterationNode.java
@@ -279,13 +279,17 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode
 		// 3) Get the alternative plans
 		List<PlanNode> candidates = this.nextPartialSolution.getAlternativePlans(estimator);
 		
-		// 4) Throw away all that are not compatible with the properties currently requested to the
-		//    initial partial solution
+		// 4) Make sure that the beginning of the step function does not assume properties that 
+		//    are not also produced by the end of the step function.
+
 		for (Iterator<PlanNode> planDeleter = candidates.iterator(); planDeleter.hasNext(); ) {
 			PlanNode candidate = planDeleter.next();
-			if (!(globPropsReq.isMetBy(candidate.getGlobalProperties()) && locPropsReq.isMetBy(candidate.getLocalProperties()))) {
-				planDeleter.remove();
+			
+			// quick-check if the properties at the end of the step function are the same as at the beginning
+			if (candidate.getGlobalProperties().equals(pspn.getGlobalProperties()) && candidate.getLocalProperties().equals(pspn.getLocalProperties())) {
+				continue;
 			}
+			planDeleter.remove();
 		}
 		
 		// 5) Create a candidate for the Iteration Node for every remaining plan of the step function.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkPartialSolutionNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkPartialSolutionNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkPartialSolutionNode.java
index 449a2fc..592768b 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkPartialSolutionNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkPartialSolutionNode.java
@@ -43,7 +43,7 @@ public class BulkPartialSolutionNode extends AbstractPartialSolutionNode {
 		if (this.cachedPlans != null) {
 			throw new IllegalStateException();
 		} else {
-			this.cachedPlans = Collections.<PlanNode>singletonList(new BulkPartialSolutionPlanNode(this, "BulkPartialSolution("+this.getPactContract().getName()+")", gProps, lProps, initialInput));
+			this.cachedPlans = Collections.<PlanNode>singletonList(new BulkPartialSolutionPlanNode(this, "PartialSolution ("+this.getPactContract().getName()+")", gProps, lProps, initialInput));
 		}
 	}
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSinkNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSinkNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSinkNode.java
index 3f7b224..15c7670 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSinkNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSinkNode.java
@@ -31,7 +31,6 @@ import eu.stratosphere.compiler.dataproperties.RequestedLocalProperties;
 import eu.stratosphere.compiler.plan.Channel;
 import eu.stratosphere.compiler.plan.PlanNode;
 import eu.stratosphere.compiler.plan.SinkPlanNode;
-import eu.stratosphere.pact.runtime.task.util.LocalStrategy;
 import eu.stratosphere.util.Visitor;
 
 /**
@@ -210,12 +209,7 @@ public class DataSinkNode extends OptimizerNode {
 				for (RequestedLocalProperties lp : ips.getLocalProperties()) {
 					Channel c = new Channel(p);
 					gp.parameterizeChannel(c, globalDopChange, localDopChange);
-
-					if (lp.isMetBy(c.getLocalPropertiesAfterShippingOnly())) {
-						c.setLocalStrategy(LocalStrategy.NONE);
-					} else {
-						lp.parameterizeChannel(c);
-					}
+					lp.parameterizeChannel(c);
 					
 					// no need to check whether the created properties meet what we need in case
 					// of ordering or global ordering, because the only interesting properties we have

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java
index 2a9e5c6..fd4b76a 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java
@@ -42,7 +42,6 @@ import eu.stratosphere.compiler.plan.SingleInputPlanNode;
 import eu.stratosphere.compiler.util.NoOpUnaryUdfOp;
 import eu.stratosphere.configuration.Configuration;
 import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
-import eu.stratosphere.pact.runtime.task.util.LocalStrategy;
 import eu.stratosphere.util.Visitor;
 
 /**
@@ -357,21 +356,17 @@ public abstract class SingleInputNode extends OptimizerNode {
 	protected void addLocalCandidates(Channel template, List<Set<? extends NamedChannel>> broadcastPlanChannels, RequestedGlobalProperties rgps,
 			List<PlanNode> target, CostEstimator estimator)
 	{
-		final LocalProperties lp = template.getLocalPropertiesAfterShippingOnly();
 		for (RequestedLocalProperties ilp : this.inConn.getInterestingProperties().getLocalProperties()) {
 			final Channel in = template.clone();
-			if (ilp.isMetBy(lp)) {
-				in.setLocalStrategy(LocalStrategy.NONE);
-			} else {
-				ilp.parameterizeChannel(in);
-			}
+			ilp.parameterizeChannel(in);
 			
 			// instantiate a candidate, if the instantiated local properties meet one possible local property set
+			outer:
 			for (OperatorDescriptorSingle dps: getPossibleProperties()) {
 				for (RequestedLocalProperties ilps : dps.getPossibleLocalProperties()) {
 					if (ilps.isMetBy(in.getLocalProperties())) {
 						instantiateCandidate(dps, in, broadcastPlanChannels, target, estimator, rgps, ilp);
-						break;
+						break outer;
 					}
 				}
 			}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java
index 5046ceb..cccbeba 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java
@@ -51,7 +51,6 @@ import eu.stratosphere.configuration.Configuration;
 import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
 import eu.stratosphere.pact.runtime.task.DamBehavior;
 import eu.stratosphere.pact.runtime.task.DriverStrategy;
-import eu.stratosphere.pact.runtime.task.util.LocalStrategy;
 import eu.stratosphere.util.Visitor;
 
 /**
@@ -488,24 +487,13 @@ public abstract class TwoInputNode extends OptimizerNode {
 			RequestedGlobalProperties rgps1, RequestedGlobalProperties rgps2,
 			List<PlanNode> target, LocalPropertiesPair[] validLocalCombinations, CostEstimator estimator)
 	{
-		final LocalProperties lp1 = template1.getLocalPropertiesAfterShippingOnly();
-		final LocalProperties lp2 = template2.getLocalPropertiesAfterShippingOnly();
-		
 		for (RequestedLocalProperties ilp1 : this.input1.getInterestingProperties().getLocalProperties()) {
 			final Channel in1 = template1.clone();
-			if (ilp1.isMetBy(lp1)) {
-				in1.setLocalStrategy(LocalStrategy.NONE);
-			} else {
-				ilp1.parameterizeChannel(in1);
-			}
+			ilp1.parameterizeChannel(in1);
 			
 			for (RequestedLocalProperties ilp2 : this.input2.getInterestingProperties().getLocalProperties()) {
 				final Channel in2 = template2.clone();
-				if (ilp2.isMetBy(lp2)) {
-					in2.setLocalStrategy(LocalStrategy.NONE);
-				} else {
-					ilp2.parameterizeChannel(in2);
-				}
+				ilp2.parameterizeChannel(in2);
 				
 				allPossibleLoop:
 				for (OperatorDescriptorDual dps: this.possibleProperties) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java
index be02e01..94580ad 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java
@@ -397,7 +397,7 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode
 		List<UnclosedBranchDescriptor> result2 = getSecondPredecessorNode().getBranchesForParent(getSecondIncomingConnection());
 
 		ArrayList<UnclosedBranchDescriptor> inputsMerged1 = new ArrayList<UnclosedBranchDescriptor>();
-		mergeLists(result1, result2, inputsMerged1);
+		mergeLists(result1, result2, inputsMerged1); // this method also sets which branches are joined here (in the head)
 		
 		addClosedBranches(getSingleRootOfStepFunction().closedBranchingNodes);
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedGlobalProperties.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedGlobalProperties.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedGlobalProperties.java
index 06d7371..3935f5e 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedGlobalProperties.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedGlobalProperties.java
@@ -17,8 +17,6 @@ import eu.stratosphere.api.common.distributions.DataDistribution;
 import eu.stratosphere.api.common.operators.Ordering;
 import eu.stratosphere.api.common.operators.util.FieldSet;
 import eu.stratosphere.compiler.CompilerException;
-import eu.stratosphere.compiler.costs.CostEstimator;
-import eu.stratosphere.compiler.costs.Costs;
 import eu.stratosphere.compiler.dag.OptimizerNode;
 import eu.stratosphere.compiler.plan.Channel;
 import eu.stratosphere.compiler.util.Utils;
@@ -31,8 +29,8 @@ import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
  * Currently, the properties are the following: A partitioning type (ANY, HASH, RANGE), and EITHER an ordering (for range partitioning)
  * or an FieldSet with the hash partitioning columns.
  */
-public final class RequestedGlobalProperties implements Cloneable
-{
+public final class RequestedGlobalProperties implements Cloneable {
+	
 	private PartitioningProperty partitioning;	// the type partitioning
 	
 	private FieldSet partitioningFields;		// the fields which are partitioned
@@ -230,20 +228,12 @@ public final class RequestedGlobalProperties implements Cloneable
 		
 		final GlobalProperties inGlobals = channel.getSource().getGlobalProperties();
 		// if we have no global parallelism change, check if we have already compatible global properties
-		if (!globalDopChange && isMetBy(inGlobals)) {
-			if (localDopChange) {
-				// if the local degree of parallelism changes, we need to adjust
-				if (inGlobals.getPartitioning() == PartitioningProperty.HASH_PARTITIONED) {
-					// to preserve the hash partitioning, we need to locally hash re-partition
-					channel.setShipStrategy(ShipStrategyType.PARTITION_LOCAL_HASH, inGlobals.getPartitioningFields());
-					return;
-				}
-				// else fall though
-			} else {
-				// we meet already everything, so go forward
-				channel.setShipStrategy(ShipStrategyType.FORWARD);
-				return;
-			}
+		if (!globalDopChange && !localDopChange && isMetBy(inGlobals)) {
+			channel.setRequiredGlobalProps(this);
+			
+			// we meet already everything, so go forward
+			channel.setShipStrategy(ShipStrategyType.FORWARD);
+			return;
 		}
 		
 		// if we fall through the conditions until here, we need to re-establish
@@ -266,26 +256,6 @@ public final class RequestedGlobalProperties implements Cloneable
 				throw new CompilerException();
 		}
 	}
-	
-	public void addMinimalRequiredCosts(Costs to, CostEstimator estimator, OptimizerNode source, OptimizerNode target) {
-		if (this.partitioning == null || this.partitioning == PartitioningProperty.RANDOM) {
-			return;
-		} else {
-			switch (this.partitioning) {
-				case FULL_REPLICATION:
-					estimator.addBroadcastCost(source, target.getDegreeOfParallelism(), to);
-				case ANY_PARTITIONING:
-				case HASH_PARTITIONED:
-					estimator.addHashPartitioningCost(source, to);
-					break;
-				case RANGE_PARTITIONED:
-					estimator.addRangePartitionCost(source, to);
-					break;
-				default:
-					throw new CompilerException();
-			}
-		}
-	}
 
 	// ------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedLocalProperties.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedLocalProperties.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedLocalProperties.java
index 7e67f5f..e714cea 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedLocalProperties.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedLocalProperties.java
@@ -187,13 +187,23 @@ public class RequestedLocalProperties implements Cloneable {
 	 * @param channel The channel to parameterize.
 	 */
 	public void parameterizeChannel(Channel channel) {
-		if (this.ordering != null) {
+		LocalProperties current = channel.getLocalProperties();
+		
+		if (isMetBy(current)) {
+			// we are met. record that this is needed.
+			channel.setRequiredLocalProps(this);
+		}
+		else if (this.ordering != null) {
 			channel.setLocalStrategy(LocalStrategy.SORT, this.ordering.getInvolvedIndexes(), this.ordering.getFieldSortDirections());
-		} else if (this.groupedFields != null) {
+		}
+		else if (this.groupedFields != null) {
 			boolean[] dirs = new boolean[this.groupedFields.size()];
 			Arrays.fill(dirs, true);
 			channel.setLocalStrategy(LocalStrategy.SORT, Utils.createOrderedFromSet(this.groupedFields), dirs);
 		}
+		else {
+			channel.setLocalStrategy(LocalStrategy.NONE);
+		}
 	}
 	
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/Channel.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/Channel.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/Channel.java
index 3377caf..1cd6a36 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/Channel.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/Channel.java
@@ -22,6 +22,8 @@ import eu.stratosphere.compiler.dag.EstimateProvider;
 import eu.stratosphere.compiler.dag.TempMode;
 import eu.stratosphere.compiler.dataproperties.GlobalProperties;
 import eu.stratosphere.compiler.dataproperties.LocalProperties;
+import eu.stratosphere.compiler.dataproperties.RequestedGlobalProperties;
+import eu.stratosphere.compiler.dataproperties.RequestedLocalProperties;
 import eu.stratosphere.compiler.plandump.DumpableConnection;
 import eu.stratosphere.compiler.util.Utils;
 import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
@@ -48,6 +50,10 @@ public class Channel implements EstimateProvider, Cloneable, DumpableConnection<
 	
 	private boolean[] localSortOrder;
 	
+	private RequestedGlobalProperties requiredGlobalProps;
+	
+	private RequestedLocalProperties requiredLocalProps;
+	
 	private GlobalProperties globalProps;
 	
 	private LocalProperties localProps;
@@ -329,6 +335,22 @@ public class Channel implements EstimateProvider, Cloneable, DumpableConnection<
 	// --------------------------------------------------------------------------------------------
 	
 
+	public RequestedGlobalProperties getRequiredGlobalProps() {
+		return requiredGlobalProps;
+	}
+
+	public void setRequiredGlobalProps(RequestedGlobalProperties requiredGlobalProps) {
+		this.requiredGlobalProps = requiredGlobalProps;
+	}
+
+	public RequestedLocalProperties getRequiredLocalProps() {
+		return requiredLocalProps;
+	}
+
+	public void setRequiredLocalProps(RequestedLocalProperties requiredLocalProps) {
+		this.requiredLocalProps = requiredLocalProps;
+	}
+
 	public GlobalProperties getGlobalProperties() {
 		if (this.globalProps == null) {
 			this.globalProps = this.source.getGlobalProperties().clone();
@@ -348,19 +370,6 @@ public class Channel implements EstimateProvider, Cloneable, DumpableConnection<
 				case PARTITION_RANDOM:
 					this.globalProps.reset();
 					break;
-				case PARTITION_LOCAL_HASH:
-					if (getSource().getGlobalProperties().isPartitionedOnFields(this.shipKeys)) {
-						// after a local hash partitioning, we can only state that the data is somehow
-						// partitioned. even if we had a hash partitioning before over 8 partitions,
-						// locally rehashing that onto 16 partitions (each one partition into two) gives you
-						// a different result than directly hashing to 16 partitions. the hash-partitioning
-						// property is only valid, if the assumed built in hash function is directly used.
-						// hence, we can only state that this is some form of partitioning.
-						this.globalProps.setAnyPartitioning(this.shipKeys);
-					} else {
-						this.globalProps.reset();
-					}
-					break;
 				case NONE:
 					throw new CompilerException("Cannot produce GlobalProperties before ship strategy is set.");
 			}
@@ -371,12 +380,13 @@ public class Channel implements EstimateProvider, Cloneable, DumpableConnection<
 	
 	public LocalProperties getLocalProperties() {
 		if (this.localProps == null) {
-			this.localProps = getLocalPropertiesAfterShippingOnly().clone();
+			computeLocalPropertiesAfterShippingOnly();
 			switch (this.localStrategy) {
 				case NONE:
 					break;
 				case SORT:
 				case COMBININGSORT:
+					this.localProps = new LocalProperties();
 					this.localProps.setOrdering(Utils.createOrdering(this.localKeys, this.localSortOrder));
 					break;
 				default:
@@ -387,25 +397,19 @@ public class Channel implements EstimateProvider, Cloneable, DumpableConnection<
 		return this.localProps;
 	}
 	
-	public LocalProperties getLocalPropertiesAfterShippingOnly() {
-		if (this.shipStrategy == ShipStrategyType.FORWARD) {
-			return this.source.getLocalProperties();
-		} else {
-			final LocalProperties props = this.source.getLocalProperties().clone();
-			switch (this.shipStrategy) {
-				case BROADCAST:
-				case PARTITION_HASH:
-				case PARTITION_RANGE:
-				case PARTITION_RANDOM:
-					props.reset();
-					break;
-				case PARTITION_LOCAL_HASH:
-				case FORWARD:
-					break;
-				case NONE:
-					throw new CompilerException("ShipStrategy has not yet been set.");
-			}
-			return props;
+	private void computeLocalPropertiesAfterShippingOnly() {
+		switch (this.shipStrategy) {
+			case BROADCAST:
+			case PARTITION_HASH:
+			case PARTITION_RANGE:
+			case PARTITION_RANDOM:
+				this.localProps = new LocalProperties();
+				break;
+			case FORWARD:
+				this.localProps = this.source.getLocalProperties();
+				break;
+			case NONE:
+				throw new CompilerException("ShipStrategy has not yet been set.");
 		}
 	}
 	
@@ -423,8 +427,7 @@ public class Channel implements EstimateProvider, Cloneable, DumpableConnection<
 		// some strategies globally reestablish properties
 		switch (this.shipStrategy) {
 		case FORWARD:
-		case PARTITION_LOCAL_HASH:
-			throw new CompilerException("Cannot use FORWARD or LOCAL_HASH strategy between operations " +
+			throw new CompilerException("Cannot use FORWARD strategy between operations " +
 					"with different number of parallel instances.");
 		case NONE: // excluded by sanity check. lust here for verification check completion
 		case BROADCAST:
@@ -452,8 +455,7 @@ public class Channel implements EstimateProvider, Cloneable, DumpableConnection<
 		case FORWARD:
 			this.globalProps.reset();
 			return;
-		case NONE: // excluded by sanity check. lust here for verification check completion
-		case PARTITION_LOCAL_HASH:
+		case NONE: // excluded by sanity check. just here to silence compiler warnings check completion
 		case BROADCAST:
 		case PARTITION_HASH:
 		case PARTITION_RANGE:

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plandump/PlanJSONDumpGenerator.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plandump/PlanJSONDumpGenerator.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plandump/PlanJSONDumpGenerator.java
index 183ee2f..e89a18b 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plandump/PlanJSONDumpGenerator.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plandump/PlanJSONDumpGenerator.java
@@ -319,9 +319,6 @@ public class PlanJSONDumpGenerator {
 						case PARTITION_RANGE:
 							shipStrategy = "Range Partition";
 							break;
-						case PARTITION_LOCAL_HASH:
-							shipStrategy = "Hash Partition (local)";
-							break;
 						case PARTITION_RANDOM:
 							shipStrategy = "Redistribute";
 							break;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java
index 4817095..aebf2cf 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java
@@ -1041,7 +1041,6 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 
 		switch (channel.getShipStrategy()) {
 			case FORWARD:
-			case PARTITION_LOCAL_HASH:
 				distributionPattern = DistributionPattern.POINTWISE;
 				channelType = ChannelType.NETWORK;
 				break;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/DOPChangeTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/DOPChangeTest.java b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/DOPChangeTest.java
index 9f53c30..273c42c 100644
--- a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/DOPChangeTest.java
+++ b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/DOPChangeTest.java
@@ -209,8 +209,8 @@ public class DOPChangeTest extends CompilerTestBase {
 		ShipStrategyType mapIn = map2Node.getInput().getShipStrategy();
 		ShipStrategyType reduceIn = red2Node.getInput().getShipStrategy();
 		
-		Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.PARTITION_LOCAL_HASH, mapIn);
-		Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, reduceIn);
+		Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, mapIn);
+		Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.PARTITION_HASH, reduceIn);
 	}
 	
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/IterationsCompilerTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/IterationsCompilerTest.java b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/IterationsCompilerTest.java
new file mode 100644
index 0000000..a123099
--- /dev/null
+++ b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/IterationsCompilerTest.java
@@ -0,0 +1,265 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.pact.compiler;
+
+import static org.junit.Assert.*;
+
+import java.util.Iterator;
+
+import org.junit.Test;
+
+import eu.stratosphere.api.common.Plan;
+import eu.stratosphere.api.java.DataSet;
+import eu.stratosphere.api.java.DeltaIteration;
+import eu.stratosphere.api.java.ExecutionEnvironment;
+import eu.stratosphere.api.java.IterativeDataSet;
+import eu.stratosphere.api.java.aggregation.Aggregations;
+import eu.stratosphere.api.java.functions.FlatMapFunction;
+import eu.stratosphere.api.java.functions.FunctionAnnotation.ConstantFields;
+import eu.stratosphere.api.java.functions.GroupReduceFunction;
+import eu.stratosphere.api.java.functions.JoinFunction;
+import eu.stratosphere.api.java.functions.MapFunction;
+import eu.stratosphere.api.java.tuple.Tuple1;
+import eu.stratosphere.api.java.tuple.Tuple2;
+import eu.stratosphere.compiler.plan.BulkIterationPlanNode;
+import eu.stratosphere.compiler.plan.Channel;
+import eu.stratosphere.compiler.plan.OptimizedPlan;
+import eu.stratosphere.compiler.plan.WorksetIterationPlanNode;
+import eu.stratosphere.compiler.plandump.PlanJSONDumpGenerator;
+import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
+import eu.stratosphere.util.Collector;
+
+
+@SuppressWarnings({"serial", "unchecked"})
+public class IterationsCompilerTest extends CompilerTestBase {
+
+	@Test
+	public void testTwoIterationsWithMapperInbetween() throws Exception {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setDegreeOfParallelism(8);
+			
+			DataSet<Tuple2<Long, Long>> verticesWithInitialId = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
+			
+			DataSet<Tuple2<Long, Long>> edges = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
+			
+			DataSet<Tuple2<Long, Long>> bulkResult = doBulkIteration(verticesWithInitialId, edges);
+			
+			DataSet<Tuple2<Long, Long>> mappedBulk = bulkResult.map(new DummyMap());
+			
+			DataSet<Tuple2<Long, Long>> depResult = doDeltaIteration(mappedBulk, edges);
+			
+			depResult.print();
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			assertEquals(1, op.getDataSinks().size());
+			assertTrue(op.getDataSinks().iterator().next().getInput().getSource() instanceof WorksetIterationPlanNode);
+			
+			WorksetIterationPlanNode wipn = (WorksetIterationPlanNode) op.getDataSinks().iterator().next().getInput().getSource();
+			
+			assertEquals(ShipStrategyType.PARTITION_HASH, wipn.getInput1().getShipStrategy());
+			assertTrue(wipn.getInput2().getTempMode().breaksPipeline());
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testTwoIterationsDirectlyChained() throws Exception {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setDegreeOfParallelism(8);
+			
+			DataSet<Tuple2<Long, Long>> verticesWithInitialId = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
+			
+			DataSet<Tuple2<Long, Long>> edges = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
+			
+			DataSet<Tuple2<Long, Long>> bulkResult = doBulkIteration(verticesWithInitialId, edges);
+			
+			DataSet<Tuple2<Long, Long>> depResult = doDeltaIteration(bulkResult, edges);
+			
+			depResult.print();
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			assertEquals(1, op.getDataSinks().size());
+			assertTrue(op.getDataSinks().iterator().next().getInput().getSource() instanceof WorksetIterationPlanNode);
+			
+			WorksetIterationPlanNode wipn = (WorksetIterationPlanNode) op.getDataSinks().iterator().next().getInput().getSource();
+			
+			assertEquals(ShipStrategyType.PARTITION_HASH, wipn.getInput1().getShipStrategy());
+			assertTrue(wipn.getInput2().getTempMode().breaksPipeline());
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testTwoWorksetIterationsDirectlyChained() throws Exception {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setDegreeOfParallelism(8);
+			
+			DataSet<Tuple2<Long, Long>> verticesWithInitialId = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
+			
+			DataSet<Tuple2<Long, Long>> edges = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
+			
+			DataSet<Tuple2<Long, Long>> firstResult = doDeltaIteration(verticesWithInitialId, edges);
+			
+			DataSet<Tuple2<Long, Long>> secondResult = doDeltaIteration(firstResult, edges);
+			
+			secondResult.print();
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			assertEquals(1, op.getDataSinks().size());
+			assertTrue(op.getDataSinks().iterator().next().getInput().getSource() instanceof WorksetIterationPlanNode);
+			
+			WorksetIterationPlanNode wipn = (WorksetIterationPlanNode) op.getDataSinks().iterator().next().getInput().getSource();
+			
+			assertEquals(ShipStrategyType.FORWARD, wipn.getInput1().getShipStrategy());
+			assertTrue(wipn.getInput2().getTempMode().breaksPipeline());
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testIterationPushingWorkOut() throws Exception {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Tuple2<Long, Long>> input1 = env.readCsvFile("/some/file/path").types(Long.class).map(new DuplicateValue());
+			
+			DataSet<Tuple2<Long, Long>> input2 = env.readCsvFile("/some/file/path").types(Long.class, Long.class);
+			
+			doBulkIteration(input1, input2).print();
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			System.out.println(new PlanJSONDumpGenerator().getOptimizerPlanAsJSON(op));
+			
+			assertEquals(1, op.getDataSinks().size());
+			assertTrue(op.getDataSinks().iterator().next().getInput().getSource() instanceof BulkIterationPlanNode);
+			
+			BulkIterationPlanNode bipn = (BulkIterationPlanNode) op.getDataSinks().iterator().next().getInput().getSource();
+			
+			for (Channel c : bipn.getPartialSolutionPlanNode().getOutgoingChannels()) {
+				assertEquals(ShipStrategyType.PARTITION_HASH, c.getShipStrategy());
+			}
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	public static DataSet<Tuple2<Long, Long>> doBulkIteration(DataSet<Tuple2<Long, Long>> vertices, DataSet<Tuple2<Long, Long>> edges) {
+		
+		// open a bulk iteration
+		IterativeDataSet<Tuple2<Long, Long>> iteration = vertices.iterate(100);
+		
+		DataSet<Tuple2<Long, Long>> changes = iteration
+				.join(edges).where(0).equalTo(0).with(new Join222())
+				.groupBy(0).aggregate(Aggregations.MIN, 1)
+				.join(iteration).where(0).equalTo(0)
+				.flatMap(new FlatMapJoin());
+		
+		// close the bulk iteration
+		return iteration.closeWith(changes);
+	}
+				
+		
+	public static DataSet<Tuple2<Long, Long>> doDeltaIteration(DataSet<Tuple2<Long, Long>> vertices, DataSet<Tuple2<Long, Long>> edges) {
+
+		DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> depIteration = vertices.iterateDelta(vertices, 100, 0);
+				
+		DataSet<Tuple1<Long>> candidates = depIteration.getWorkset().join(edges).where(0).equalTo(0)
+				.projectSecond(1).types(Long.class);
+		
+		DataSet<Tuple1<Long>> grouped = candidates.groupBy(0).reduceGroup(new Reduce101());
+		
+		DataSet<Tuple2<Long, Long>> candidatesDependencies = 
+				grouped.join(edges).where(0).equalTo(1).projectSecond(0, 1).types(Long.class, Long.class);
+		
+		DataSet<Tuple2<Long, Long>> verticesWithNewComponents = 
+				candidatesDependencies.join(depIteration.getSolutionSet()).where(0).equalTo(0)
+				.with(new Join222())
+				.groupBy(0).aggregate(Aggregations.MIN, 1);
+		
+		DataSet<Tuple2<Long, Long>> updatedComponentId = 
+				verticesWithNewComponents.join(depIteration.getSolutionSet()).where(0).equalTo(0)
+				.flatMap(new FlatMapJoin());
+		
+		DataSet<Tuple2<Long, Long>> depResult = depIteration.closeWith(updatedComponentId, updatedComponentId);
+		
+		return depResult;
+		
+	}
+	
+	public static final class Join222 extends JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
+
+		@Override
+		public Tuple2<Long, Long> join(Tuple2<Long, Long> vertexWithComponent, Tuple2<Long, Long> edge) {
+			return null;
+		}
+	}
+	
+	public static final class FlatMapJoin extends FlatMapFunction<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>, Tuple2<Long, Long>> {
+		
+		@Override
+		public void flatMap(Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> value, Collector<Tuple2<Long, Long>> out) {}
+	}
+	
+	public static final class DummyMap extends MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+
+		@Override
+		public Tuple2<Long, Long> map(Tuple2<Long, Long> value) throws Exception {
+			return value;
+		}
+	}
+	
+	@ConstantFields("0")
+	public static final class Reduce101 extends GroupReduceFunction<Tuple1<Long>, Tuple1<Long>> {
+		
+		@Override
+		public void reduce(Iterator<Tuple1<Long>> values, Collector<Tuple1<Long>> out) {}
+	}
+	
+	@ConstantFields("0")
+	public static final class DuplicateValue extends MapFunction<Tuple1<Long>, Tuple2<Long, Long>> {
+
+		@Override
+		public Tuple2<Long, Long> map(Tuple1<Long> value) throws Exception {
+			return new Tuple2<Long, Long>(value.f0, value.f0);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/MultipleIterationsCompilerTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/MultipleIterationsCompilerTest.java b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/MultipleIterationsCompilerTest.java
deleted file mode 100644
index 5cf1425..0000000
--- a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/MultipleIterationsCompilerTest.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/***********************************************************************************************************************
- *
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- *
- **********************************************************************************************************************/
-
-package eu.stratosphere.pact.compiler;
-
-import static org.junit.Assert.*;
-
-import java.util.Iterator;
-
-import org.junit.Test;
-
-import eu.stratosphere.api.common.Plan;
-import eu.stratosphere.api.java.DataSet;
-import eu.stratosphere.api.java.DeltaIteration;
-import eu.stratosphere.api.java.ExecutionEnvironment;
-import eu.stratosphere.api.java.IterativeDataSet;
-import eu.stratosphere.api.java.aggregation.Aggregations;
-import eu.stratosphere.api.java.functions.FlatMapFunction;
-import eu.stratosphere.api.java.functions.FunctionAnnotation.ConstantFields;
-import eu.stratosphere.api.java.functions.GroupReduceFunction;
-import eu.stratosphere.api.java.functions.JoinFunction;
-import eu.stratosphere.api.java.functions.MapFunction;
-import eu.stratosphere.api.java.tuple.Tuple1;
-import eu.stratosphere.api.java.tuple.Tuple2;
-import eu.stratosphere.compiler.plan.OptimizedPlan;
-import eu.stratosphere.compiler.plan.WorksetIterationPlanNode;
-import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
-import eu.stratosphere.util.Collector;
-
-
-@SuppressWarnings({"serial", "unchecked"})
-public class MultipleIterationsCompilerTest extends CompilerTestBase {
-
-	@Test
-	public void testTwoIterationsWithMapperInbetween() throws Exception {
-		try {
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(8);
-			
-			DataSet<Tuple2<Long, Long>> verticesWithInitialId = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
-			
-			DataSet<Tuple2<Long, Long>> edges = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
-			
-			DataSet<Tuple2<Long, Long>> bulkResult = doBulkIteration(verticesWithInitialId, edges);
-			
-			DataSet<Tuple2<Long, Long>> mappedBulk = bulkResult.map(new DummyMap());
-			
-			DataSet<Tuple2<Long, Long>> depResult = doDeltaIteration(mappedBulk, edges);
-			
-			depResult.print();
-			
-			Plan p = env.createProgramPlan();
-			OptimizedPlan op = compileNoStats(p);
-			
-			assertEquals(1, op.getDataSinks().size());
-			assertTrue(op.getDataSinks().iterator().next().getInput().getSource() instanceof WorksetIterationPlanNode);
-			
-			WorksetIterationPlanNode wipn = (WorksetIterationPlanNode) op.getDataSinks().iterator().next().getInput().getSource();
-			
-			assertEquals(ShipStrategyType.PARTITION_HASH, wipn.getInput1().getShipStrategy());
-			assertTrue(wipn.getInput2().getTempMode().breaksPipeline());
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testTwoIterationsDirectlyChained() throws Exception {
-		try {
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(8);
-			
-			DataSet<Tuple2<Long, Long>> verticesWithInitialId = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
-			
-			DataSet<Tuple2<Long, Long>> edges = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
-			
-			DataSet<Tuple2<Long, Long>> bulkResult = doBulkIteration(verticesWithInitialId, edges);
-			
-			DataSet<Tuple2<Long, Long>> depResult = doDeltaIteration(bulkResult, edges);
-			
-			depResult.print();
-			
-			Plan p = env.createProgramPlan();
-			OptimizedPlan op = compileNoStats(p);
-			
-			assertEquals(1, op.getDataSinks().size());
-			assertTrue(op.getDataSinks().iterator().next().getInput().getSource() instanceof WorksetIterationPlanNode);
-			
-			WorksetIterationPlanNode wipn = (WorksetIterationPlanNode) op.getDataSinks().iterator().next().getInput().getSource();
-			
-			assertEquals(ShipStrategyType.PARTITION_HASH, wipn.getInput1().getShipStrategy());
-			assertTrue(wipn.getInput2().getTempMode().breaksPipeline());
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testTwoWorksetIterationsDirectlyChained() throws Exception {
-		try {
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(8);
-			
-			DataSet<Tuple2<Long, Long>> verticesWithInitialId = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
-			
-			DataSet<Tuple2<Long, Long>> edges = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
-			
-			DataSet<Tuple2<Long, Long>> firstResult = doDeltaIteration(verticesWithInitialId, edges);
-			
-			DataSet<Tuple2<Long, Long>> secondResult = doDeltaIteration(firstResult, edges);
-			
-			secondResult.print();
-			
-			Plan p = env.createProgramPlan();
-			OptimizedPlan op = compileNoStats(p);
-			
-			assertEquals(1, op.getDataSinks().size());
-			assertTrue(op.getDataSinks().iterator().next().getInput().getSource() instanceof WorksetIterationPlanNode);
-			
-			WorksetIterationPlanNode wipn = (WorksetIterationPlanNode) op.getDataSinks().iterator().next().getInput().getSource();
-			
-			assertEquals(ShipStrategyType.FORWARD, wipn.getInput1().getShipStrategy());
-			assertTrue(wipn.getInput2().getTempMode().breaksPipeline());
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	public static DataSet<Tuple2<Long, Long>> doBulkIteration(DataSet<Tuple2<Long, Long>> vertices, DataSet<Tuple2<Long, Long>> edges) {
-		
-		// open a bulk iteration
-		IterativeDataSet<Tuple2<Long, Long>> iteration = vertices.iterate(100);
-		
-		DataSet<Tuple2<Long, Long>> changes = iteration
-				.join(edges).where(0).equalTo(0).with(new Join222())
-				.groupBy(0).aggregate(Aggregations.MIN, 1)
-				.join(iteration).where(0).equalTo(0)
-				.flatMap(new FlatMapJoin());
-		
-		// close the bulk iteration
-		return iteration.closeWith(changes);
-	}
-				
-		
-	public static DataSet<Tuple2<Long, Long>> doDeltaIteration(DataSet<Tuple2<Long, Long>> vertices, DataSet<Tuple2<Long, Long>> edges) {
-
-		DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> depIteration = vertices.iterateDelta(vertices, 100, 0);
-				
-		DataSet<Tuple1<Long>> candidates = depIteration.getWorkset().join(edges).where(0).equalTo(0)
-				.projectSecond(1).types(Long.class);
-		
-		DataSet<Tuple1<Long>> grouped = candidates.groupBy(0).reduceGroup(new Reduce101());
-		
-		DataSet<Tuple2<Long, Long>> candidatesDependencies = 
-				grouped.join(edges).where(0).equalTo(1).projectSecond(0, 1).types(Long.class, Long.class);
-		
-		DataSet<Tuple2<Long, Long>> verticesWithNewComponents = 
-				candidatesDependencies.join(depIteration.getSolutionSet()).where(0).equalTo(0)
-				.with(new Join222())
-				.groupBy(0).aggregate(Aggregations.MIN, 1);
-		
-		DataSet<Tuple2<Long, Long>> updatedComponentId = 
-				verticesWithNewComponents.join(depIteration.getSolutionSet()).where(0).equalTo(0)
-				.flatMap(new FlatMapJoin());
-		
-		DataSet<Tuple2<Long, Long>> depResult = depIteration.closeWith(updatedComponentId, updatedComponentId);
-		
-		return depResult;
-		
-	}
-	
-	public static final class Join222 extends JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
-
-		@Override
-		public Tuple2<Long, Long> join(Tuple2<Long, Long> vertexWithComponent, Tuple2<Long, Long> edge) {
-			return null;
-		}
-	}
-	
-	public static final class FlatMapJoin extends FlatMapFunction<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>, Tuple2<Long, Long>> {
-		
-		@Override
-		public void flatMap(Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> value, Collector<Tuple2<Long, Long>> out) {}
-	}
-	
-	public static final class DummyMap extends MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
-
-		@Override
-		public Tuple2<Long, Long> map(Tuple2<Long, Long> value) throws Exception {
-			return value;
-		}
-	}
-	
-	@ConstantFields("0")
-	public static final class Reduce101 extends GroupReduceFunction<Tuple1<Long>, Tuple1<Long>> {
-		
-		@Override
-		public void reduce(Iterator<Tuple1<Long>> values, Collector<Tuple1<Long>> out) {}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/convergence/WorksetEmptyConvergenceCriterion.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/convergence/WorksetEmptyConvergenceCriterion.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/convergence/WorksetEmptyConvergenceCriterion.java
index bb8af9c..c9fd442 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/convergence/WorksetEmptyConvergenceCriterion.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/convergence/WorksetEmptyConvergenceCriterion.java
@@ -24,6 +24,8 @@ import eu.stratosphere.types.LongValue;
  */
 public class WorksetEmptyConvergenceCriterion implements ConvergenceCriterion<LongValue> {
 
+	private static final long serialVersionUID = 1L;
+
 	private static final Log log = LogFactory.getLog(WorksetEmptyConvergenceCriterion.class);
 	
 	public static final String AGGREGATOR_NAME = "pact.runtime.workset-empty-aggregator";

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/OutputEmitter.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/OutputEmitter.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/OutputEmitter.java
index 6491749..638dd1d 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/OutputEmitter.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/OutputEmitter.java
@@ -78,7 +78,6 @@ public class OutputEmitter<T> implements ChannelSelector<SerializationDelegate<T
 		switch (strategy) {
 		case FORWARD:
 		case PARTITION_HASH:
-		case PARTITION_LOCAL_HASH:
 		case PARTITION_RANGE:
 		case PARTITION_RANDOM:
 		case BROADCAST:
@@ -103,7 +102,6 @@ public class OutputEmitter<T> implements ChannelSelector<SerializationDelegate<T
 		case PARTITION_RANDOM:
 			return robin(numberOfChannels);
 		case PARTITION_HASH:
-		case PARTITION_LOCAL_HASH:
 			return hashPartitionDefault(record.getInstance(), numberOfChannels);
 		case PARTITION_RANGE:
 			return rangePartition(record.getInstance(), numberOfChannels);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/RecordOutputEmitter.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/RecordOutputEmitter.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/RecordOutputEmitter.java
index 7fe35b4..efdb267 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/RecordOutputEmitter.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/RecordOutputEmitter.java
@@ -83,7 +83,6 @@ public class RecordOutputEmitter implements ChannelSelector<Record> {
 		switch (strategy) {
 		case FORWARD:
 		case PARTITION_HASH:
-		case PARTITION_LOCAL_HASH:
 		case PARTITION_RANGE:
 		case PARTITION_RANDOM:
 			this.channels = new int[1];
@@ -110,7 +109,6 @@ public class RecordOutputEmitter implements ChannelSelector<Record> {
 		case PARTITION_RANDOM:
 			return robin(numberOfChannels);
 		case PARTITION_HASH:
-		case PARTITION_LOCAL_HASH:
 			return hashPartitionDefault(record, numberOfChannels);
 		case PARTITION_RANGE:
 			return rangePartition(record, numberOfChannels);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/ShipStrategyType.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/ShipStrategyType.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/ShipStrategyType.java
index 2b50779..a86c209 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/ShipStrategyType.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/ShipStrategyType.java
@@ -40,12 +40,6 @@ public enum ShipStrategyType {
 	PARTITION_HASH(true, true, true),
 	
 	/**
-	 * Repartitioning the data within a local instance with a hash function. Happens for example when the
-	 * intra-node degree-of-parallelism is increased. 
-	 */
-	PARTITION_LOCAL_HASH(false, true, true),
-	
-	/**
 	 * Partitioning the data in ranges according to a total order.
 	 */
 	PARTITION_RANGE(true, true, true),

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-tests/src/test/java/eu/stratosphere/test/util/testjar/KMeansForTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/util/testjar/KMeansForTest.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/util/testjar/KMeansForTest.java
index c13ddc8..d1b249a 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/util/testjar/KMeansForTest.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/util/testjar/KMeansForTest.java
@@ -45,7 +45,7 @@ public class KMeansForTest implements Program{
 		}
 	
 		// set up execution environment
-		ExecutionEnvironment env = new RemoteEnvironment("localhost", 1, null);
+		ExecutionEnvironment env = new RemoteEnvironment("localhost", 1);
 		
 		// get input data
 		DataSet<Point> points = getPointDataSet(env);


[2/6] git commit: [FLINK-935] (continued) Correct feedback property check for iterative algorithms.

Posted by se...@apache.org.
[FLINK-935] (continued) Correct feedback property check for iterative algorithms.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/ca2b287a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/ca2b287a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/ca2b287a

Branch: refs/heads/master
Commit: ca2b287a7a78328ebf43766b9fdf39b56fb5fd4f
Parents: ed7c3f1
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Jun 16 15:05:10 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jun 17 02:07:37 2014 +0200

----------------------------------------------------------------------
 .../api/avro/AvroOutputFormatTest.java          |    1 +
 .../compiler/dag/BulkIterationNode.java         |   61 +-
 .../stratosphere/compiler/dag/DataSinkNode.java |    2 +
 .../compiler/dag/SingleInputNode.java           |    2 +
 .../stratosphere/compiler/dag/TwoInputNode.java |   17 +-
 .../compiler/dag/WorksetIterationNode.java      |   55 +-
 .../dataproperties/LocalProperties.java         |    2 +-
 .../RequestedGlobalProperties.java              |    2 -
 .../RequestedLocalProperties.java               |    8 +-
 .../eu/stratosphere/compiler/plan/PlanNode.java |  103 +-
 .../compiler/CoGroupSolutionSetFirstTest.java   |    1 -
 .../compiler/FeedbackPropertiesMatchTest.java   | 1432 ++++++++++++++++++
 .../pact/compiler/IterationsCompilerTest.java   |    3 -
 .../testfunctions/DummyJoinFunction.java        |   28 +
 .../example/java/graph/PageRankBasic.java       |    8 +-
 .../pact/runtime/cache/FileCache.java           |    9 +-
 .../pact/runtime/shipping/ShipStrategyType.java |    2 +-
 .../netty/NettyConnectionManagerTest.java       |    2 +-
 .../iterations/ConnectedComponentsTest.java     |    4 +-
 .../iterations/PageRankCompilerTest.java        |  108 ++
 20 files changed, 1807 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca2b287a/stratosphere-addons/avro/src/test/java/eu/stratosphere/api/avro/AvroOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-addons/avro/src/test/java/eu/stratosphere/api/avro/AvroOutputFormatTest.java b/stratosphere-addons/avro/src/test/java/eu/stratosphere/api/avro/AvroOutputFormatTest.java
index b0e4b6e..20610a2 100644
--- a/stratosphere-addons/avro/src/test/java/eu/stratosphere/api/avro/AvroOutputFormatTest.java
+++ b/stratosphere-addons/avro/src/test/java/eu/stratosphere/api/avro/AvroOutputFormatTest.java
@@ -32,6 +32,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 
+@SuppressWarnings("serial")
 public class AvroOutputFormatTest extends JavaProgramTestBase {
 
 	public static String outputPath1;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca2b287a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkIterationNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkIterationNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkIterationNode.java
index b60f427..f6720ea 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkIterationNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkIterationNode.java
@@ -13,12 +13,14 @@
 
 package eu.stratosphere.compiler.dag;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 
 import eu.stratosphere.api.common.operators.base.BulkIterationBase;
+import eu.stratosphere.api.common.operators.util.FieldList;
 import eu.stratosphere.compiler.CompilerException;
 import eu.stratosphere.compiler.DataStatistics;
 import eu.stratosphere.compiler.PactCompiler.InterestingPropertyVisitor;
@@ -36,6 +38,9 @@ import eu.stratosphere.compiler.plan.BulkPartialSolutionPlanNode;
 import eu.stratosphere.compiler.plan.Channel;
 import eu.stratosphere.compiler.plan.NamedChannel;
 import eu.stratosphere.compiler.plan.PlanNode;
+import eu.stratosphere.compiler.plan.SingleInputPlanNode;
+import eu.stratosphere.compiler.plan.PlanNode.FeedbackPropertiesMeetRequirementsReport;
+import eu.stratosphere.pact.runtime.task.DriverStrategy;
 import eu.stratosphere.util.Visitor;
 
 /**
@@ -271,6 +276,9 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode
 		// 1) Because we enumerate multiple times, we may need to clean the cached plans
 		//    before starting another enumeration
 		this.nextPartialSolution.accept(PlanCacheCleaner.INSTANCE);
+		if (this.terminationCriterion != null) {
+			this.terminationCriterion.accept(PlanCacheCleaner.INSTANCE);
+		}
 		
 		// 2) Give the partial solution the properties of the current candidate for the initial partial solution
 		this.partialSolution.setCandidateProperties(in.getGlobalProperties(), in.getLocalProperties(), in);
@@ -282,14 +290,51 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode
 		// 4) Make sure that the beginning of the step function does not assume properties that 
 		//    are not also produced by the end of the step function.
 
-		for (Iterator<PlanNode> planDeleter = candidates.iterator(); planDeleter.hasNext(); ) {
-			PlanNode candidate = planDeleter.next();
+		{
+			List<PlanNode> newCandidates = new ArrayList<PlanNode>();
 			
-			// quick-check if the properties at the end of the step function are the same as at the beginning
-			if (candidate.getGlobalProperties().equals(pspn.getGlobalProperties()) && candidate.getLocalProperties().equals(pspn.getLocalProperties())) {
-				continue;
+			for (Iterator<PlanNode> planDeleter = candidates.iterator(); planDeleter.hasNext(); ) {
+				PlanNode candidate = planDeleter.next();
+				
+				GlobalProperties atEndGlobal = candidate.getGlobalProperties();
+				LocalProperties atEndLocal = candidate.getLocalProperties();
+				
+				FeedbackPropertiesMeetRequirementsReport report = candidate.checkPartialSolutionPropertiesMet(pspn, atEndGlobal, atEndLocal);
+				if (report == FeedbackPropertiesMeetRequirementsReport.NO_PARTIAL_SOLUTION) {
+					; // depends only through broadcast variable on the partial solution
+				}
+				else if (report == FeedbackPropertiesMeetRequirementsReport.NOT_MET) {
+					// attach a no-op node through which we create the properties of the original input
+					Channel toNoOp = new Channel(candidate);
+					globPropsReq.parameterizeChannel(toNoOp, false, false);
+					locPropsReq.parameterizeChannel(toNoOp);
+					
+					UnaryOperatorNode rebuildPropertiesNode = new UnaryOperatorNode("Rebuild Partial Solution Properties", FieldList.EMPTY_LIST);
+					rebuildPropertiesNode.setDegreeOfParallelism(candidate.getDegreeOfParallelism());
+					rebuildPropertiesNode.setSubtasksPerInstance(candidate.getSubtasksPerInstance());
+					
+					SingleInputPlanNode rebuildPropertiesPlanNode = new SingleInputPlanNode(rebuildPropertiesNode, "Rebuild Partial Solution Properties", toNoOp, DriverStrategy.UNARY_NO_OP);
+					rebuildPropertiesPlanNode.initProperties(toNoOp.getGlobalProperties(), toNoOp.getLocalProperties());
+					estimator.costOperator(rebuildPropertiesPlanNode);
+						
+					GlobalProperties atEndGlobalModified = rebuildPropertiesPlanNode.getGlobalProperties();
+					LocalProperties atEndLocalModified = rebuildPropertiesPlanNode.getLocalProperties();
+						
+					if (!(atEndGlobalModified.equals(atEndGlobal) && atEndLocalModified.equals(atEndLocal))) {
+						FeedbackPropertiesMeetRequirementsReport report2 = candidate.checkPartialSolutionPropertiesMet(pspn, atEndGlobalModified, atEndLocalModified);
+						
+						if (report2 != FeedbackPropertiesMeetRequirementsReport.NOT_MET) {
+							newCandidates.add(rebuildPropertiesPlanNode);
+						}
+					}
+					
+					planDeleter.remove();
+				}
 			}
-			planDeleter.remove();
+		}
+		
+		if (candidates.isEmpty()) {
+			return;
 		}
 		
 		// 5) Create a candidate for the Iteration Node for every remaining plan of the step function.
@@ -302,13 +347,13 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode
 				target.add(node);
 			}
 		}
-		else if(candidates.size() > 0) {
+		else if (candidates.size() > 0) {
 			List<PlanNode> terminationCriterionCandidates = this.terminationCriterion.getAlternativePlans(estimator);
 
 			SingleRootJoiner singleRoot = (SingleRootJoiner) this.singleRoot;
 			
 			for (PlanNode candidate : candidates) {
-				for(PlanNode terminationCandidate : terminationCriterionCandidates) {
+				for (PlanNode terminationCandidate : terminationCriterionCandidates) {
 					if (singleRoot.areBranchCompatible(candidate, terminationCandidate)) {
 						BulkIterationPlanNode node = new BulkIterationPlanNode(this, "BulkIteration ("+this.getPactContract().getName()+")", in, pspn, candidate, terminationCandidate);
 						GlobalProperties gProps = candidate.getGlobalProperties().clone();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca2b287a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSinkNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSinkNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSinkNode.java
index 15c7670..fe823d2 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSinkNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSinkNode.java
@@ -210,6 +210,8 @@ public class DataSinkNode extends OptimizerNode {
 					Channel c = new Channel(p);
 					gp.parameterizeChannel(c, globalDopChange, localDopChange);
 					lp.parameterizeChannel(c);
+					c.setRequiredLocalProps(lp);
+					c.setRequiredGlobalProps(gp);
 					
 					// no need to check whether the created properties meet what we need in case
 					// of ordering or global ordering, because the only interesting properties we have

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca2b287a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java
index fd4b76a..e1727f9 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java
@@ -312,6 +312,7 @@ public abstract class SingleInputNode extends OptimizerNode {
 					// requested properties
 					for (RequestedGlobalProperties rgps: allValidGlobals) {
 						if (rgps.isMetBy(c.getGlobalProperties())) {
+							c.setRequiredGlobalProps(rgps);
 							addLocalCandidates(c, broadcastPlanChannels, igps, outputPlans, estimator);
 							break;
 						}
@@ -365,6 +366,7 @@ public abstract class SingleInputNode extends OptimizerNode {
 			for (OperatorDescriptorSingle dps: getPossibleProperties()) {
 				for (RequestedLocalProperties ilps : dps.getPossibleLocalProperties()) {
 					if (ilps.isMetBy(in.getLocalProperties())) {
+						in.setRequiredLocalProps(ilps);
 						instantiateCandidate(dps, in, broadcastPlanChannels, target, estimator, rgps, ilp);
 						break outer;
 					}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca2b287a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java
index cccbeba..9898c81 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java
@@ -447,9 +447,13 @@ public abstract class TwoInputNode extends OptimizerNode {
 							if (gpp.getProperties1().isMetBy(c1.getGlobalProperties()) && 
 								gpp.getProperties2().isMetBy(c2.getGlobalProperties()) )
 							{
+								Channel c1Clone = c1.clone();
+								c1Clone.setRequiredGlobalProps(gpp.getProperties1());
+								c2.setRequiredGlobalProps(gpp.getProperties2());
+								
 								// we form a valid combination, so create the local candidates
 								// for this
-								addLocalCandidates(c1, c2, broadcastPlanChannels, igps1, igps2, outputPlans, allLocalPairs, estimator);
+								addLocalCandidates(c1Clone, c2, broadcastPlanChannels, igps1, igps2, outputPlans, allLocalPairs, estimator);
 								break;
 							}
 						}
@@ -495,7 +499,6 @@ public abstract class TwoInputNode extends OptimizerNode {
 				final Channel in2 = template2.clone();
 				ilp2.parameterizeChannel(in2);
 				
-				allPossibleLoop:
 				for (OperatorDescriptorDual dps: this.possibleProperties) {
 					for (LocalPropertiesPair lpp : dps.getPossibleLocalProperties()) {
 						if (lpp.getProperties1().isMetBy(in1.getLocalProperties()) &&
@@ -507,12 +510,16 @@ public abstract class TwoInputNode extends OptimizerNode {
 							if (dps.areCoFulfilled(lpp.getProperties1(), lpp.getProperties2(), 
 								in1.getLocalProperties(), in2.getLocalProperties()))
 							{
+								Channel in1Copy = in1.clone();
+								in1Copy.setRequiredLocalProps(lpp.getProperties1());
+								in2.setRequiredLocalProps(lpp.getProperties2());
+								
 								// all right, co compatible
-								instantiate(dps, in1, in2, broadcastPlanChannels, target, estimator, rgps1, rgps2, ilp1, ilp2);
-								break allPossibleLoop;
+								instantiate(dps, in1Copy, in2, broadcastPlanChannels, target, estimator, rgps1, rgps2, ilp1, ilp2);
+								break;
 							} else {
 								// meet, but not co-compatible
-								throw new CompilerException("Implements to adjust one side to the other!");
+//								throw new CompilerException("Implements to adjust one side to the other!");
 							}
 						}
 					}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca2b287a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java
index 76c4402..2c70794 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java
@@ -41,6 +41,7 @@ import eu.stratosphere.compiler.plan.SingleInputPlanNode;
 import eu.stratosphere.compiler.plan.SolutionSetPlanNode;
 import eu.stratosphere.compiler.plan.WorksetIterationPlanNode;
 import eu.stratosphere.compiler.plan.WorksetPlanNode;
+import eu.stratosphere.compiler.plan.PlanNode.FeedbackPropertiesMeetRequirementsReport;
 import eu.stratosphere.compiler.util.NoOpBinaryUdfOp;
 import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
 import eu.stratosphere.pact.runtime.task.DriverStrategy;
@@ -279,7 +280,7 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode
 	@Override
 	protected void instantiate(OperatorDescriptorDual operator, Channel solutionSetIn, Channel worksetIn,
 			List<Set<? extends NamedChannel>> broadcastPlanChannels, List<PlanNode> target, CostEstimator estimator,
-			RequestedGlobalProperties globPropsReqSolutionSet,RequestedGlobalProperties globPropsReqWorkset,
+			RequestedGlobalProperties globPropsReqSolutionSet, RequestedGlobalProperties globPropsReqWorkset,
 			RequestedLocalProperties locPropsReqSolutionSet, RequestedLocalProperties locPropsReqWorkset)
 	{
 		// check for pipeline breaking using hash join with build on the solution set side
@@ -314,12 +315,54 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode
 		//    initial partial solution
 		
 		// Make sure that the workset candidates fulfill the input requirements
-		for (Iterator<PlanNode> planDeleter = worksetCandidates.iterator(); planDeleter.hasNext(); ) {
-			PlanNode candidate = planDeleter.next();
-			if (!(globPropsReqWorkset.isMetBy(candidate.getGlobalProperties()) && locPropsReqWorkset.isMetBy(candidate.getLocalProperties()))) {
-				planDeleter.remove();
+		{
+			List<PlanNode> newCandidates = new ArrayList<PlanNode>();
+			
+			for (Iterator<PlanNode> planDeleter = worksetCandidates.iterator(); planDeleter.hasNext(); ) {
+				PlanNode candidate = planDeleter.next();
+				
+				GlobalProperties atEndGlobal = candidate.getGlobalProperties();
+				LocalProperties atEndLocal = candidate.getLocalProperties();
+				
+				FeedbackPropertiesMeetRequirementsReport report = candidate.checkPartialSolutionPropertiesMet(wspn, atEndGlobal, atEndLocal);
+				if (report == FeedbackPropertiesMeetRequirementsReport.NO_PARTIAL_SOLUTION) {
+					; // depends only through broadcast variable on the workset solution
+				}
+				else if (report == FeedbackPropertiesMeetRequirementsReport.NOT_MET) {
+					// attach a no-op node through which we create the properties of the original input
+					Channel toNoOp = new Channel(candidate);
+					globPropsReqWorkset.parameterizeChannel(toNoOp, false, false);
+					locPropsReqWorkset.parameterizeChannel(toNoOp);
+					
+					UnaryOperatorNode rebuildWorksetPropertiesNode = new UnaryOperatorNode("Rebuild Workset Properties", FieldList.EMPTY_LIST);
+					
+					rebuildWorksetPropertiesNode.setDegreeOfParallelism(candidate.getDegreeOfParallelism());
+					rebuildWorksetPropertiesNode.setSubtasksPerInstance(candidate.getSubtasksPerInstance());
+					
+					SingleInputPlanNode rebuildWorksetPropertiesPlanNode = new SingleInputPlanNode(rebuildWorksetPropertiesNode, "Rebuild Workset Properties", toNoOp, DriverStrategy.UNARY_NO_OP);
+					rebuildWorksetPropertiesPlanNode.initProperties(toNoOp.getGlobalProperties(), toNoOp.getLocalProperties());
+					estimator.costOperator(rebuildWorksetPropertiesPlanNode);
+						
+					GlobalProperties atEndGlobalModified = rebuildWorksetPropertiesPlanNode.getGlobalProperties();
+					LocalProperties atEndLocalModified = rebuildWorksetPropertiesPlanNode.getLocalProperties();
+						
+					if (!(atEndGlobalModified.equals(atEndGlobal) && atEndLocalModified.equals(atEndLocal))) {
+						FeedbackPropertiesMeetRequirementsReport report2 = candidate.checkPartialSolutionPropertiesMet(wspn, atEndGlobalModified, atEndLocalModified);
+						
+						if (report2 != FeedbackPropertiesMeetRequirementsReport.NOT_MET) {
+							newCandidates.add(rebuildWorksetPropertiesPlanNode);
+						}
+					}
+					
+					// remove the original operator and add the modified candidate
+					planDeleter.remove();
+					
+				}
 			}
+			
+			worksetCandidates.addAll(newCandidates);
 		}
+		
 		if (worksetCandidates.isEmpty()) {
 			return;
 		}
@@ -342,7 +385,7 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode
 		gp.setHashPartitioned(this.solutionSetKeyFields);
 		gp.addUniqueFieldCombination(this.solutionSetKeyFields);
 		
-		LocalProperties lp = LocalProperties.TRIVIAL.addUniqueFields(this.solutionSetKeyFields);
+		LocalProperties lp = LocalProperties.EMPTY.addUniqueFields(this.solutionSetKeyFields);
 		
 		// take all combinations of solution set delta and workset plans
 		for (PlanNode solutionSetCandidate : solutionSetDeltaCandidates) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca2b287a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/LocalProperties.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/LocalProperties.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/LocalProperties.java
index b45e341..f49c130 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/LocalProperties.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/LocalProperties.java
@@ -27,7 +27,7 @@ import eu.stratosphere.compiler.dag.OptimizerNode;
  */
 public class LocalProperties implements Cloneable {
 	
-	public static final LocalProperties TRIVIAL = new LocalProperties();
+	public static final LocalProperties EMPTY = new LocalProperties();
 	
 	// --------------------------------------------------------------------------------------------
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca2b287a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedGlobalProperties.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedGlobalProperties.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedGlobalProperties.java
index 3935f5e..574922a 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedGlobalProperties.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedGlobalProperties.java
@@ -229,8 +229,6 @@ public final class RequestedGlobalProperties implements Cloneable {
 		final GlobalProperties inGlobals = channel.getSource().getGlobalProperties();
 		// if we have no global parallelism change, check if we have already compatible global properties
 		if (!globalDopChange && !localDopChange && isMetBy(inGlobals)) {
-			channel.setRequiredGlobalProps(this);
-			
 			// we meet already everything, so go forward
 			channel.setShipStrategy(ShipStrategyType.FORWARD);
 			return;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca2b287a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedLocalProperties.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedLocalProperties.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedLocalProperties.java
index e714cea..aeae0d2 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedLocalProperties.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedLocalProperties.java
@@ -33,9 +33,9 @@ public class RequestedLocalProperties implements Cloneable {
 	
 	// --------------------------------------------------------------------------------------------
 	
-	Ordering ordering;			// order inside a partition, null if not ordered
+	private Ordering ordering;			// order inside a partition, null if not ordered
 
-	FieldSet groupedFields;		// fields by which the stream is grouped. null if not grouped.
+	private FieldSet groupedFields;		// fields by which the stream is grouped. null if not grouped.
 
 	// --------------------------------------------------------------------------------------------
 	
@@ -190,8 +190,8 @@ public class RequestedLocalProperties implements Cloneable {
 		LocalProperties current = channel.getLocalProperties();
 		
 		if (isMetBy(current)) {
-			// we are met. record that this is needed.
-			channel.setRequiredLocalProps(this);
+			// we are met, all is good
+			channel.setLocalStrategy(LocalStrategy.NONE);
 		}
 		else if (this.ordering != null) {
 			channel.setLocalStrategy(LocalStrategy.SORT, this.ordering.getInvolvedIndexes(), this.ordering.getFieldSortDirections());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca2b287a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/PlanNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/PlanNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/PlanNode.java
index 16241e4..69263bc 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/PlanNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/PlanNode.java
@@ -29,7 +29,9 @@ import eu.stratosphere.compiler.dataproperties.GlobalProperties;
 import eu.stratosphere.compiler.dataproperties.LocalProperties;
 import eu.stratosphere.compiler.plandump.DumpableConnection;
 import eu.stratosphere.compiler.plandump.DumpableNode;
+import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
 import eu.stratosphere.pact.runtime.task.DriverStrategy;
+import eu.stratosphere.pact.runtime.task.util.LocalStrategy;
 import eu.stratosphere.util.Visitable;
 
 /**
@@ -434,8 +436,85 @@ public abstract class PlanNode implements Visitable<PlanNode>, DumpableNode<Plan
 	
 	// --------------------------------------------------------------------------------------------
 	
+	/**
+	 * Checks whether this node has a dam on the way down to the given source node. This method
+	 * returns either that (a) the source node is not found as a (transitive) child of this node,
+	 * (b) the node is found, but no dam is on the path, or (c) the node is found and a dam is on
+	 * the path.
+	 * 
+	 * @param source The node on the path to which the dam is sought.
+	 * @return The result whether the node is found and whether a dam is on the path.
+	 */
 	public abstract SourceAndDamReport hasDamOnPathDownTo(PlanNode source);
 	
+	public FeedbackPropertiesMeetRequirementsReport checkPartialSolutionPropertiesMet(PlanNode partialSolution, GlobalProperties feedbackGlobal, LocalProperties feedbackLocal) {
+		if (this == partialSolution) {
+			return FeedbackPropertiesMeetRequirementsReport.PENDING;
+		}
+		
+		boolean found = false;
+		boolean allMet = true;
+		boolean allLocallyMet = true;
+		
+		for (Channel input : getInputs()) {
+			FeedbackPropertiesMeetRequirementsReport inputState = input.getSource().checkPartialSolutionPropertiesMet(partialSolution, feedbackGlobal, feedbackLocal);
+			
+			if (inputState == FeedbackPropertiesMeetRequirementsReport.NO_PARTIAL_SOLUTION) {
+				continue;
+			}
+			else if (inputState == FeedbackPropertiesMeetRequirementsReport.MET) {
+				found = true;
+				continue;
+			}
+			else if (inputState == FeedbackPropertiesMeetRequirementsReport.NOT_MET) {
+				return FeedbackPropertiesMeetRequirementsReport.NOT_MET;
+			}
+			else {
+				found = true;
+				
+				// the partial solution was on the path here. check whether the channel requires
+				// certain properties that are met, or whether the channel introduces new properties
+				
+				// if the plan introduces new global properties, then we can stop looking whether
+				// the feedback properties are sufficient to meet the requirements
+				if (input.getShipStrategy() != ShipStrategyType.FORWARD && input.getShipStrategy() != ShipStrategyType.NONE) {
+					continue;
+				}
+				
+				// first check whether this channel requires something that is not met
+				if (input.getRequiredGlobalProps() != null && !input.getRequiredGlobalProps().isMetBy(feedbackGlobal)) {
+					return FeedbackPropertiesMeetRequirementsReport.NOT_MET;
+				}
+				
+				// in general, not everything is met here already
+				allMet = false;
+				
+				// if the plan introduces new local properties, we can stop checking for matching local properties
+				if (inputState != FeedbackPropertiesMeetRequirementsReport.PENDING_LOCAL_MET) {
+					
+					if (input.getLocalStrategy() == LocalStrategy.NONE) {
+						
+						if (input.getRequiredLocalProps() != null && !input.getRequiredLocalProps().isMetBy(feedbackLocal)) {
+							return FeedbackPropertiesMeetRequirementsReport.NOT_MET;
+						}
+						
+						allLocallyMet = false;
+					}
+				}
+			}
+		}
+		
+		if (!found) {
+			return FeedbackPropertiesMeetRequirementsReport.NO_PARTIAL_SOLUTION;
+		} else if (allMet) {
+			return FeedbackPropertiesMeetRequirementsReport.MET;
+		} else if (allLocallyMet) {
+			return FeedbackPropertiesMeetRequirementsReport.PENDING_LOCAL_MET;
+		} else {
+			return FeedbackPropertiesMeetRequirementsReport.PENDING;
+		}
+	}
+	
 	// --------------------------------------------------------------------------------------------
 	
 
@@ -447,19 +526,16 @@ public abstract class PlanNode implements Visitable<PlanNode>, DumpableNode<Plan
 	
 	// --------------------------------------------------------------------------------------------
 	
-
 	@Override
 	public OptimizerNode getOptimizerNode() {
 		return this.template;
 	}
 
-
 	@Override
 	public PlanNode getPlanNode() {
 		return this;
 	}
 
-
 	@Override
 	public Iterable<DumpableConnection<PlanNode>> getDumpableInputs() {
 		List<DumpableConnection<PlanNode>> allInputs = new ArrayList<DumpableConnection<PlanNode>>();
@@ -480,4 +556,25 @@ public abstract class PlanNode implements Visitable<PlanNode>, DumpableNode<Plan
 	public static enum SourceAndDamReport {
 		NOT_FOUND, FOUND_SOURCE, FOUND_SOURCE_AND_DAM;
 	}
+	
+	
+	
+	public static enum FeedbackPropertiesMeetRequirementsReport {
+		/** Indicates that the path is irrelevant */
+		NO_PARTIAL_SOLUTION,
+		
+		/** Indicates that the question whether the properties are met has been determined pending
+		 * dependent on global and local properties */
+		PENDING,
+		
+		/** Indicates that the question whether the properties are met has been determined pending
+		 * dependent on global properties only */
+		PENDING_LOCAL_MET,
+		
+		/** Indicates that the question whether the properties are met has been determined true */
+		MET,
+		
+		/** Indicates that the question whether the properties are met has been determined false */
+		NOT_MET;
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca2b287a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/CoGroupSolutionSetFirstTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/CoGroupSolutionSetFirstTest.java b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/CoGroupSolutionSetFirstTest.java
index c2b377e..1c99558 100644
--- a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/CoGroupSolutionSetFirstTest.java
+++ b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/CoGroupSolutionSetFirstTest.java
@@ -76,7 +76,6 @@ public class CoGroupSolutionSetFirstTest extends CompilerTestBase {
 		oPlan.accept(new Visitor<PlanNode>() {
 			@Override
 			public boolean preVisit(PlanNode visitable) {
-				System.out.println(visitable);
 				if (visitable instanceof WorksetIterationPlanNode) {
 					PlanNode deltaNode = ((WorksetIterationPlanNode) visitable).getSolutionSetDeltaPlanNode();
 


[4/6] git commit: Minor fix for displayed estimates.

Posted by se...@apache.org.
Minor fix for displayed estimates.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/229754d2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/229754d2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/229754d2

Branch: refs/heads/master
Commit: 229754d20076fd24f97c3c0b87cbf18828de8e08
Parents: 708426b
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Jun 13 16:54:42 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jun 17 02:07:37 2014 +0200

----------------------------------------------------------------------
 .../main/java/eu/stratosphere/compiler/dag/OptimizerNode.java | 7 +++++++
 1 file changed, 7 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/229754d2/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/OptimizerNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/OptimizerNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/OptimizerNode.java
index f2a9053..70471c1 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/OptimizerNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/OptimizerNode.java
@@ -630,6 +630,13 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
 		// let every operator do its computation
 		computeOperatorSpecificDefaultEstimates(statistics);
 		
+		if (this.estimatedOutputSize < 0) {
+			this.estimatedOutputSize = -1;
+		}
+		if (this.estimatedNumRecords < 0) {
+			this.estimatedNumRecords = -1;
+		}
+		
 		// overwrite default estimates with hints, if given
 		if (getPactContract() == null || getPactContract().getCompilerHints() == null) {
 			return ;


[5/6] git commit: Made LocalProperties Immutable (avoid side effect mutation bugs) Change several Iterators to Iterables in the compiler (simpler code)

Posted by se...@apache.org.
Made LocalProperties Immutable (avoid side effect mutation bugs)
Change several Iterators to Iterables in the compiler (simpler code)


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/ed7c3f15
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/ed7c3f15
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/ed7c3f15

Branch: refs/heads/master
Commit: ed7c3f156ec632af6f5764fad8650a771b69c1d7
Parents: 202aa4a
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Jun 16 09:13:59 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jun 17 02:07:37 2014 +0200

----------------------------------------------------------------------
 .../eu/stratosphere/compiler/PactCompiler.java  |   9 +-
 .../compiler/costs/CostEstimator.java           |   5 +-
 .../compiler/dag/OptimizerNode.java             |   8 +-
 .../compiler/dag/WorksetIterationNode.java      |   3 +-
 .../dataproperties/LocalProperties.java         | 133 +++++++++----------
 .../operators/AllGroupReduceProperties.java     |   3 +-
 .../AllGroupWithPartialPreGroupProperties.java  |   3 +-
 .../compiler/operators/CoGroupDescriptor.java   |   3 +-
 .../operators/CollectorMapDescriptor.java       |   3 +-
 .../CrossStreamOuterFirstDescriptor.java        |   6 +-
 .../CrossStreamOuterSecondDescriptor.java       |   6 +-
 .../compiler/operators/FlatMapDescriptor.java   |   3 +-
 .../operators/GroupReduceProperties.java        |   3 +-
 .../GroupReduceWithCombineProperties.java       |   3 +-
 .../operators/PartialGroupProperties.java       |   3 +-
 .../compiler/operators/ReduceProperties.java    |   3 +-
 .../operators/SortMergeJoinDescriptor.java      |   3 +-
 .../plan/BulkPartialSolutionPlanNode.java       |   9 +-
 .../eu/stratosphere/compiler/plan/Channel.java  |   3 +-
 .../compiler/plan/DualInputPlanNode.java        |  58 +-------
 .../compiler/plan/NAryUnionPlanNode.java        |  14 +-
 .../eu/stratosphere/compiler/plan/PlanNode.java |  22 +--
 .../compiler/plan/SingleInputPlanNode.java      |  53 +-------
 .../compiler/plan/SolutionSetPlanNode.java      |   9 +-
 .../compiler/plan/SourcePlanNode.java           |   9 +-
 .../compiler/plan/WorksetPlanNode.java          |   9 +-
 .../compiler/plandump/DumpableNode.java         |   6 +-
 .../plandump/PlanJSONDumpGenerator.java         |   9 +-
 .../plantranslate/NepheleJobGraphGenerator.java |  10 +-
 .../postpass/GenericFlatTypePostPass.java       |   5 +-
 .../compiler/postpass/JavaApiPostPass.java      |   5 +-
 .../compiler/CoGroupSolutionSetFirstTest.java   |   2 +-
 .../compiler/UnionPropertyPropagationTest.java  |   5 +-
 .../eu/stratosphere/util/IterableIterator.java  |  26 ++++
 34 files changed, 181 insertions(+), 273 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ed7c3f15/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/PactCompiler.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/PactCompiler.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/PactCompiler.java
index 8399fed..96eb01d 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/PactCompiler.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/PactCompiler.java
@@ -1284,8 +1284,7 @@ public class PactCompiler {
 					}
 					
 					// assign memory to the local and global strategies of the channels
-					for (Iterator<Channel> channels = node.getInputs(); channels.hasNext();) {
-						final Channel c = channels.next();
+					for (Channel c : node.getInputs()) {
 						if (c.getLocalStrategy().dams()) {
 							final long mem = memoryPerInstanceAndWeight / node.getSubtasksPerInstance();
 							c.setMemoryLocalStrategy(mem);
@@ -1362,8 +1361,7 @@ public class PactCompiler {
 			
 			// double-connect the connections. previously, only parents knew their children, because
 			// one child candidate could have been referenced by multiple parents.
-			for (Iterator<Channel> iter = visitable.getInputs(); iter.hasNext();) {
-				final Channel conn = iter.next();
+			for (Channel conn : visitable.getInputs()) {
 				conn.setTarget(visitable);
 				conn.getSource().addOutgoingChannel(conn);
 			}
@@ -1375,8 +1373,7 @@ public class PactCompiler {
 
 			// count the memory consumption
 			this.memoryConsumerWeights += visitable.getMemoryConsumerWeight();
-			for (Iterator<Channel> channels = visitable.getInputs(); channels.hasNext();) {
-				final Channel c = channels.next();
+			for (Channel c : visitable.getInputs()) {
 				if (c.getLocalStrategy().dams()) {
 					this.memoryConsumerWeights++;
 				}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ed7c3f15/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/CostEstimator.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/CostEstimator.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/CostEstimator.java
index 11fb45b..3e1d94a 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/CostEstimator.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/CostEstimator.java
@@ -71,8 +71,7 @@ public abstract class CostEstimator {
 		final long availableMemory = n.getGuaranteedAvailableMemory();
 		
 		// add the shipping strategy costs
-		for (Iterator<Channel> channels = n.getInputs(); channels.hasNext(); ) {
-			final Channel channel = channels.next();
+		for (Channel channel : n.getInputs()) {
 			final Costs costs = new Costs();
 			
 			// Plans that apply the same strategies, but at different points
@@ -139,7 +138,7 @@ public abstract class CostEstimator {
 		
 		// get the inputs, if we have some
 		{
-			Iterator<Channel> channels = n.getInputs();
+			Iterator<Channel> channels = n.getInputs().iterator();
 			if (channels.hasNext()) {
 				firstInput = channels.next();
 			}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ed7c3f15/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/OptimizerNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/OptimizerNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/OptimizerNode.java
index 70471c1..b2c9330 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/OptimizerNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/OptimizerNode.java
@@ -285,7 +285,7 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
 	// ------------------------------------------------------------------------
 
 	@Override
-	public Iterator<OptimizerNode> getPredecessors() {
+	public Iterable<OptimizerNode> getPredecessors() {
 		List<OptimizerNode> allPredecessors = new ArrayList<OptimizerNode>();
 		
 		for (Iterator<PactConnection> inputs = getIncomingConnections().iterator(); inputs.hasNext(); ){
@@ -296,7 +296,7 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
 			allPredecessors.add(conn.getSource());
 		}
 		
-		return allPredecessors.iterator();
+		return allPredecessors;
 	}
 	
 	/**
@@ -1208,13 +1208,13 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
 	}
 	
 	@Override
-	public Iterator<DumpableConnection<OptimizerNode>> getDumpableInputs() {
+	public Iterable<DumpableConnection<OptimizerNode>> getDumpableInputs() {
 		List<DumpableConnection<OptimizerNode>> allInputs = new ArrayList<DumpableConnection<OptimizerNode>>();
 		
 		allInputs.addAll(getIncomingConnections());
 		allInputs.addAll(getBroadcastConnections());
 		
-		return allInputs.iterator();
+		return allInputs;
 	}
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ed7c3f15/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java
index 94580ad..76c4402 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java
@@ -342,8 +342,7 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode
 		gp.setHashPartitioned(this.solutionSetKeyFields);
 		gp.addUniqueFieldCombination(this.solutionSetKeyFields);
 		
-		final LocalProperties lp = new LocalProperties();
-		lp.addUniqueFields(this.solutionSetKeyFields);
+		LocalProperties lp = LocalProperties.TRIVIAL.addUniqueFields(this.solutionSetKeyFields);
 		
 		// take all combinations of solution set delta and workset plans
 		for (PlanNode solutionSetCandidate : solutionSetDeltaCandidates) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ed7c3f15/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/LocalProperties.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/LocalProperties.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/LocalProperties.java
index d4cbeca..b45e341 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/LocalProperties.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/LocalProperties.java
@@ -25,8 +25,12 @@ import eu.stratosphere.compiler.dag.OptimizerNode;
  * This class represents local properties of the data. A local property is a property that exists
  * within the data of a single partition.
  */
-public class LocalProperties implements Cloneable
-{
+public class LocalProperties implements Cloneable {
+	
+	public static final LocalProperties TRIVIAL = new LocalProperties();
+	
+	// --------------------------------------------------------------------------------------------
+	
 	private Ordering ordering;			// order inside a partition, null if not ordered
 
 	private FieldList groupedFields;		// fields by which the stream is grouped. null if not grouped.
@@ -39,23 +43,6 @@ public class LocalProperties implements Cloneable
 	 * Default constructor for trivial local properties. No order, no grouping, no uniqueness.
 	 */
 	public LocalProperties() {}
-	
-	/**
-	 * Creates a new instance of local properties that have the given ordering as property,
-	 * field grouping and field uniqueness. Any of the given parameters may be null. Beware, though,
-	 * that a null grouping is inconsistent with a non-null ordering.
-	 * <p>
-	 * This constructor is used only for internal copy creation.
-	 * 
-	 * @param ordering The ordering represented by these local properties.
-	 * @param groupedFields The grouped fields for these local properties.
-	 * @param uniqueFields The unique fields for these local properties.
-	 */
-	private LocalProperties(Ordering ordering, FieldList groupedFields, Set<FieldSet> uniqueFields) {
-		this.ordering = ordering;
-		this.groupedFields = groupedFields;
-		this.uniqueFields = uniqueFields;
-	}
 
 	// --------------------------------------------------------------------------------------------
 	
@@ -69,17 +56,6 @@ public class LocalProperties implements Cloneable
 	}
 	
 	/**
-	 * Sets the key order for these global properties.
-	 * 
-	 * @param keyOrder
-	 *        The key order to set.
-	 */
-	public void setOrdering(Ordering ordering) {
-		this.ordering = ordering;
-		this.groupedFields = ordering.getInvolvedIndexes();
-	}
-	
-	/**
 	 * Gets the grouped fields.
 	 * 
 	 * @return The grouped fields, or <code>null</code> if nothing is grouped.
@@ -87,15 +63,6 @@ public class LocalProperties implements Cloneable
 	public FieldList getGroupedFields() {
 		return this.groupedFields;
 	}
-	
-	/**
-	 * Sets the fields that are grouped in these data properties.
-	 * 
-	 * @param groupedFields The fields that are grouped in these data properties.
-	 */
-	public void setGroupedFields(FieldList groupedFields) {
-		this.groupedFields = groupedFields;	
-	}
 
 	/**
 	 * Gets the fields whose combination is unique within the data set.
@@ -106,6 +73,12 @@ public class LocalProperties implements Cloneable
 		return this.uniqueFields;
 	}
 	
+	/**
+	 * Checks whether the given set of fields is unique, as specified in these local properties.
+	 * 
+	 * @param set The set to check.
+	 * @return True, if the given column combination is unique, false if not.
+	 */
 	public boolean areFieldsUnique(FieldSet set) {
 		return this.uniqueFields != null && this.uniqueFields.contains(set);
 	}
@@ -115,42 +88,33 @@ public class LocalProperties implements Cloneable
 	 * 
 	 * @param uniqueFields The fields that are unique in these data properties.
 	 */
-	public void addUniqueFields(FieldSet uniqueFields) {
-		if (this.uniqueFields == null) {
-			this.uniqueFields = new HashSet<FieldSet>();
+	public LocalProperties addUniqueFields(FieldSet uniqueFields) {
+		LocalProperties copy = clone();
+		
+		if (copy.uniqueFields == null) {
+			copy.uniqueFields = new HashSet<FieldSet>();
 		}
-		this.uniqueFields.add(uniqueFields);
+		copy.uniqueFields.add(uniqueFields);
+		return copy;
 	}
 	
-	public void clearUniqueFieldSets() {
-		if (this.uniqueFields != null) {
-			this.uniqueFields = null;
+	public LocalProperties clearUniqueFieldSets() {
+		if (this.uniqueFields == null || this.uniqueFields.isEmpty()) {
+			return this;
+		} else {
+			LocalProperties copy = new LocalProperties();
+			copy.ordering = this.ordering;
+			copy.groupedFields = this.groupedFields;
+			return copy;
 		}
 	}
 	
-	public boolean areFieldsGrouped(FieldSet set) {
-		return this.groupedFields != null && this.groupedFields.isValidUnorderedPrefix(set);
-	}
-	
-	public boolean meetsOrderingConstraint(Ordering o) {
-		return o.isMetBy(this.ordering);
-	}
-	
 	/**
 	 * Checks, if the properties in this object are trivial, i.e. only standard values.
 	 */
 	public boolean isTrivial() {
 		return ordering == null && this.groupedFields == null && this.uniqueFields == null;
 	}
-
-	/**
-	 * This method resets the local properties to a state where no properties are given.
-	 */
-	public void reset() {
-		this.ordering = null;
-		this.groupedFields = null;
-		this.uniqueFields = null;
-	}
 	
 	// --------------------------------------------------------------------------------------------
 
@@ -162,8 +126,7 @@ public class LocalProperties implements Cloneable
 	 * 
 	 * @return True, if the resulting properties are non trivial.
 	 */
-	public LocalProperties filterByNodesConstantSet(OptimizerNode node, int input)
-	{
+	public LocalProperties filterByNodesConstantSet(OptimizerNode node, int input) {
 		// check, whether the local order is preserved
 		Ordering no = this.ordering;
 		FieldList ngf = this.groupedFields;
@@ -183,7 +146,8 @@ public class LocalProperties implements Cloneable
 					break;
 				}
 			}
-		} else if (this.groupedFields != null) {
+		}
+		else if (this.groupedFields != null) {
 			// check, whether the local key grouping is preserved
 			for (Integer index : this.groupedFields) {
 				if (!node.isFieldConstant(input, index)) {
@@ -192,8 +156,7 @@ public class LocalProperties implements Cloneable
 			}
 		}
 		
-		// check, whether the local key grouping is preserved
-		if (this.uniqueFields != null) {
+		if (this.uniqueFields != null && this.uniqueFields.size() > 0) {
 			Set<FieldSet> s = new HashSet<FieldSet>(this.uniqueFields);
 			for (FieldSet fields : this.uniqueFields) {
 				for (Integer index : fields) {
@@ -208,9 +171,15 @@ public class LocalProperties implements Cloneable
 			}
 		}
 		
-		return (no == this.ordering && ngf == this.groupedFields && nuf == this.uniqueFields) ? this :
-			(no == null && ngf == null && nuf == null) ? new LocalProperties() :
-					new LocalProperties(no, ngf, nuf);
+		if (no == this.ordering && ngf == this.groupedFields && nuf == this.uniqueFields) {
+			return this;
+		} else {
+			LocalProperties lp = new LocalProperties();
+			lp.ordering = no;
+			lp.groupedFields = ngf;
+			lp.uniqueFields = nuf;
+			return lp;
+		}
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -245,8 +214,11 @@ public class LocalProperties implements Cloneable
 
 	@Override
 	public LocalProperties clone() {
-		return new LocalProperties(this.ordering, this.groupedFields,
-			this.uniqueFields == null ? null : new HashSet<FieldSet>(this.uniqueFields));
+		LocalProperties copy = new LocalProperties();
+		copy.ordering = this.ordering;
+		copy.groupedFields = this.groupedFields;
+		copy.uniqueFields = (this.uniqueFields == null ? null : new HashSet<FieldSet>(this.uniqueFields));
+		return copy;
 	}
 	
 	// --------------------------------------------------------------------------------------------
@@ -268,4 +240,19 @@ public class LocalProperties implements Cloneable
 			return lp1;
 		}
 	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	public static LocalProperties forOrdering(Ordering o) {
+		LocalProperties props = new LocalProperties();
+		props.ordering = o;
+		props.groupedFields = o.getInvolvedIndexes();
+		return props;
+	}
+	
+	public static LocalProperties forGrouping(FieldList groupedFields) {
+		LocalProperties props = new LocalProperties();
+		props.groupedFields = groupedFields;
+		return props;
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ed7c3f15/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/AllGroupReduceProperties.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/AllGroupReduceProperties.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/AllGroupReduceProperties.java
index ee03ce0..2cc7fe2 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/AllGroupReduceProperties.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/AllGroupReduceProperties.java
@@ -62,7 +62,6 @@ public final class AllGroupReduceProperties extends OperatorDescriptorSingle {
 
 	@Override
 	public LocalProperties computeLocalProperties(LocalProperties lProps) {
-		lProps.clearUniqueFieldSets();
-		return lProps;
+		return lProps.clearUniqueFieldSets();
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ed7c3f15/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/AllGroupWithPartialPreGroupProperties.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/AllGroupWithPartialPreGroupProperties.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/AllGroupWithPartialPreGroupProperties.java
index d387c69..b389855 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/AllGroupWithPartialPreGroupProperties.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/AllGroupWithPartialPreGroupProperties.java
@@ -84,7 +84,6 @@ public final class AllGroupWithPartialPreGroupProperties extends OperatorDescrip
 
 	@Override
 	public LocalProperties computeLocalProperties(LocalProperties lProps) {
-		lProps.clearUniqueFieldSets();
-		return lProps;
+		return lProps.clearUniqueFieldSets();
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ed7c3f15/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CoGroupDescriptor.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CoGroupDescriptor.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CoGroupDescriptor.java
index edc6c69..e288600 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CoGroupDescriptor.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CoGroupDescriptor.java
@@ -156,7 +156,6 @@ public class CoGroupDescriptor extends OperatorDescriptorDual {
 	@Override
 	public LocalProperties computeLocalProperties(LocalProperties in1, LocalProperties in2) {
 		LocalProperties comb = LocalProperties.combine(in1, in2);
-		comb.clearUniqueFieldSets();
-		return comb;
+		return comb.clearUniqueFieldSets();
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ed7c3f15/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CollectorMapDescriptor.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CollectorMapDescriptor.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CollectorMapDescriptor.java
index ef574c6..ebdcb97 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CollectorMapDescriptor.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CollectorMapDescriptor.java
@@ -62,7 +62,6 @@ public class CollectorMapDescriptor extends OperatorDescriptorSingle {
 	
 	@Override
 	public LocalProperties computeLocalProperties(LocalProperties lProps) {
-		lProps.clearUniqueFieldSets();
-		return lProps;
+		return lProps.clearUniqueFieldSets();
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ed7c3f15/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CrossStreamOuterFirstDescriptor.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CrossStreamOuterFirstDescriptor.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CrossStreamOuterFirstDescriptor.java
index fa2559e..3a6ad15 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CrossStreamOuterFirstDescriptor.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CrossStreamOuterFirstDescriptor.java
@@ -40,9 +40,9 @@ public class CrossStreamOuterFirstDescriptor extends CartesianProductDescriptor
 		if ((in1.getGroupedFields() == null || in1.getGroupedFields().size() == 0) &&
 				in1.getUniqueFields() != null && in1.getUniqueFields().size() > 0)
 		{
-			in1.setGroupedFields(in1.getUniqueFields().iterator().next().toFieldList());
+			return LocalProperties.forGrouping(in1.getUniqueFields().iterator().next().toFieldList());
+		} else {
+			return in1.clearUniqueFieldSets();
 		}
-		in1.clearUniqueFieldSets();
-		return in1;
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ed7c3f15/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CrossStreamOuterSecondDescriptor.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CrossStreamOuterSecondDescriptor.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CrossStreamOuterSecondDescriptor.java
index 4a2c9e2..7a81c72 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CrossStreamOuterSecondDescriptor.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CrossStreamOuterSecondDescriptor.java
@@ -40,9 +40,9 @@ public class CrossStreamOuterSecondDescriptor extends CartesianProductDescriptor
 		if ((in2.getGroupedFields() == null || in2.getGroupedFields().size() == 0) &&
 				in2.getUniqueFields() != null && in2.getUniqueFields().size() > 0)
 		{
-			in2.setGroupedFields(in2.getUniqueFields().iterator().next().toFieldList());
+			return LocalProperties.forGrouping(in2.getUniqueFields().iterator().next().toFieldList());
+		} else {
+			return in2.clearUniqueFieldSets();
 		}
-		in2.clearUniqueFieldSets();
-		return in2;
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ed7c3f15/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/FlatMapDescriptor.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/FlatMapDescriptor.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/FlatMapDescriptor.java
index 807fe88..c81be04 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/FlatMapDescriptor.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/FlatMapDescriptor.java
@@ -62,7 +62,6 @@ public class FlatMapDescriptor extends OperatorDescriptorSingle {
 	
 	@Override
 	public LocalProperties computeLocalProperties(LocalProperties lProps) {
-		lProps.clearUniqueFieldSets();
-		return lProps;
+		return lProps.clearUniqueFieldSets();
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ed7c3f15/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/GroupReduceProperties.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/GroupReduceProperties.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/GroupReduceProperties.java
index 72f05ee..5cddf62 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/GroupReduceProperties.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/GroupReduceProperties.java
@@ -99,7 +99,6 @@ public final class GroupReduceProperties extends OperatorDescriptorSingle {
 	
 	@Override
 	public LocalProperties computeLocalProperties(LocalProperties lProps) {
-		lProps.clearUniqueFieldSets();
-		return lProps;
+		return lProps.clearUniqueFieldSets();
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ed7c3f15/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/GroupReduceWithCombineProperties.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/GroupReduceWithCombineProperties.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/GroupReduceWithCombineProperties.java
index 496ab01..980cf6d 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/GroupReduceWithCombineProperties.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/GroupReduceWithCombineProperties.java
@@ -129,7 +129,6 @@ public final class GroupReduceWithCombineProperties extends OperatorDescriptorSi
 
 	@Override
 	public LocalProperties computeLocalProperties(LocalProperties lProps) {
-		lProps.clearUniqueFieldSets();
-		return lProps;
+		return lProps.clearUniqueFieldSets();
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ed7c3f15/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/PartialGroupProperties.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/PartialGroupProperties.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/PartialGroupProperties.java
index f4f090d..a28feeb 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/PartialGroupProperties.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/PartialGroupProperties.java
@@ -74,7 +74,6 @@ public final class PartialGroupProperties extends OperatorDescriptorSingle {
 	
 	@Override
 	public LocalProperties computeLocalProperties(LocalProperties lProps) {
-		lProps.clearUniqueFieldSets();
-		return lProps;
+		return lProps.clearUniqueFieldSets();
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ed7c3f15/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/ReduceProperties.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/ReduceProperties.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/ReduceProperties.java
index ec4d95c..4539da5 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/ReduceProperties.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/ReduceProperties.java
@@ -96,7 +96,6 @@ public final class ReduceProperties extends OperatorDescriptorSingle {
 
 	@Override
 	public LocalProperties computeLocalProperties(LocalProperties lProps) {
-		lProps.clearUniqueFieldSets();
-		return lProps;
+		return lProps.clearUniqueFieldSets();
 	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ed7c3f15/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/SortMergeJoinDescriptor.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/SortMergeJoinDescriptor.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/SortMergeJoinDescriptor.java
index 17a8bd0..f5d83f9 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/SortMergeJoinDescriptor.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/SortMergeJoinDescriptor.java
@@ -89,7 +89,6 @@ public class SortMergeJoinDescriptor extends AbstractJoinDescriptor
 	@Override
 	public LocalProperties computeLocalProperties(LocalProperties in1, LocalProperties in2) {
 		LocalProperties comb = LocalProperties.combine(in1, in2);
-		comb.clearUniqueFieldSets();
-		return comb;
+		return comb.clearUniqueFieldSets();
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ed7c3f15/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/BulkPartialSolutionPlanNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/BulkPartialSolutionPlanNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/BulkPartialSolutionPlanNode.java
index ae3dc63..2281a5c 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/BulkPartialSolutionPlanNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/BulkPartialSolutionPlanNode.java
@@ -19,7 +19,6 @@ import static eu.stratosphere.compiler.plan.PlanNode.SourceAndDamReport.NOT_FOUN
 
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.Iterator;
 
 import eu.stratosphere.compiler.costs.Costs;
 import eu.stratosphere.compiler.dag.BulkPartialSolutionNode;
@@ -91,13 +90,13 @@ public class BulkPartialSolutionPlanNode extends PlanNode {
 	}
 
 	@Override
-	public Iterator<PlanNode> getPredecessors() {
-		return Collections.<PlanNode>emptyList().iterator();
+	public Iterable<PlanNode> getPredecessors() {
+		return Collections.<PlanNode>emptyList();
 	}
 
 	@Override
-	public Iterator<Channel> getInputs() {
-		return Collections.<Channel>emptyList().iterator();
+	public Iterable<Channel> getInputs() {
+		return Collections.<Channel>emptyList();
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ed7c3f15/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/Channel.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/Channel.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/Channel.java
index 1cd6a36..8fe95c9 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/Channel.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/Channel.java
@@ -386,8 +386,7 @@ public class Channel implements EstimateProvider, Cloneable, DumpableConnection<
 					break;
 				case SORT:
 				case COMBININGSORT:
-					this.localProps = new LocalProperties();
-					this.localProps.setOrdering(Utils.createOrdering(this.localKeys, this.localSortOrder));
+					this.localProps = LocalProperties.forOrdering(Utils.createOrdering(this.localKeys, this.localSortOrder));
 					break;
 				default:
 					throw new CompilerException("Unsupported local strategy for channel.");

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ed7c3f15/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/DualInputPlanNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/DualInputPlanNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/DualInputPlanNode.java
index 9fe9e2a..8ad32a8 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/DualInputPlanNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/DualInputPlanNode.java
@@ -18,9 +18,8 @@ import static eu.stratosphere.compiler.plan.PlanNode.SourceAndDamReport.FOUND_SO
 import static eu.stratosphere.compiler.plan.PlanNode.SourceAndDamReport.NOT_FOUND;
 
 import java.util.ArrayList;
-import java.util.Iterator;
+import java.util.Arrays;
 import java.util.List;
-import java.util.NoSuchElementException;
 
 import eu.stratosphere.api.common.operators.util.FieldList;
 import eu.stratosphere.api.common.typeutils.TypeComparatorFactory;
@@ -168,31 +167,9 @@ public class DualInputPlanNode extends PlanNode {
 	
 
 	@Override
-	public Iterator<PlanNode> getPredecessors() {
+	public Iterable<PlanNode> getPredecessors() {
 		if (getBroadcastInputs() == null || getBroadcastInputs().isEmpty()) {
-			return new Iterator<PlanNode>() {
-				private int hasLeft = 2;
-				@Override
-				public boolean hasNext() {
-					return this.hasLeft > 0;
-				}
-				@Override
-				public PlanNode next() {
-					if (this.hasLeft == 2) {
-						this.hasLeft = 1;
-						return DualInputPlanNode.this.input1.getSource();
-					} else if (this.hasLeft == 1) {
-						this.hasLeft = 0;
-						return DualInputPlanNode.this.input2.getSource();
-					} else {
-						throw new NoSuchElementException();
-					}
-				}
-				@Override
-				public void remove() {
-					throw new UnsupportedOperationException();
-				}
-			};
+			return Arrays.asList(this.input1.getSource(), this.input2.getSource());
 		} else {
 			List<PlanNode> preds = new ArrayList<PlanNode>();
 			
@@ -203,36 +180,13 @@ public class DualInputPlanNode extends PlanNode {
 				preds.add(c.getSource());
 			}
 			
-			return preds.iterator();
+			return preds;
 		}
 	}
 
-
 	@Override
-	public Iterator<Channel> getInputs() {
-		return new Iterator<Channel>() {
-			private int hasLeft = 2;
-			@Override
-			public boolean hasNext() {
-				return this.hasLeft > 0;
-			}
-			@Override
-			public Channel next() {
-				if (this.hasLeft == 2) {
-					this.hasLeft = 1;
-					return DualInputPlanNode.this.input1;
-				} else if (this.hasLeft == 1) {
-					this.hasLeft = 0;
-					return DualInputPlanNode.this.input2;
-				} else {
-					throw new NoSuchElementException();
-				}
-			}
-			@Override
-			public void remove() {
-				throw new UnsupportedOperationException();
-			}
-		};
+	public Iterable<Channel> getInputs() {
+		return Arrays.asList(this.input1, this.input2);
 	}
 
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ed7c3f15/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/NAryUnionPlanNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/NAryUnionPlanNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/NAryUnionPlanNode.java
index b011b8f..8009cbe 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/NAryUnionPlanNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/NAryUnionPlanNode.java
@@ -21,6 +21,7 @@ import eu.stratosphere.compiler.dag.BinaryUnionNode;
 import eu.stratosphere.compiler.dataproperties.GlobalProperties;
 import eu.stratosphere.compiler.dataproperties.LocalProperties;
 import eu.stratosphere.pact.runtime.task.DriverStrategy;
+import eu.stratosphere.util.IterableIterator;
 import eu.stratosphere.util.Visitor;
 
 /**
@@ -55,14 +56,14 @@ public class NAryUnionPlanNode extends PlanNode {
 	}
 
 	@Override
-	public Iterator<Channel> getInputs() {
-		return Collections.unmodifiableList(this.inputs).iterator();
+	public Iterable<Channel> getInputs() {
+		return Collections.unmodifiableList(this.inputs);
 	}
 
 	@Override
-	public Iterator<PlanNode> getPredecessors() {
+	public Iterable<PlanNode> getPredecessors() {
 		final Iterator<Channel> channels = this.inputs.iterator();
-		return new Iterator<PlanNode>() {
+		return new IterableIterator<PlanNode>() {
 
 			@Override
 			public boolean hasNext() {
@@ -78,6 +79,11 @@ public class NAryUnionPlanNode extends PlanNode {
 			public void remove() {
 				throw new UnsupportedOperationException();
 			}
+			
+			@Override
+			public Iterator<PlanNode> iterator() {
+				return this;
+			}
 		};
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ed7c3f15/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/PlanNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/PlanNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/PlanNode.java
index 4f2c049..16241e4 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/PlanNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/PlanNode.java
@@ -15,7 +15,6 @@ package eu.stratosphere.compiler.plan;
 
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -275,8 +274,9 @@ public abstract class PlanNode implements Visitable<PlanNode>, DumpableNode<Plan
 		this.cumulativeCosts = nodeCosts.clone();
 		
 		// add all the normal inputs
-		for (Iterator<PlanNode> preds = getPredecessors(); preds.hasNext();) {
-			Costs parentCosts = preds.next().getCumulativeCostsShare();
+		for (PlanNode pred : getPredecessors()) {
+			
+			Costs parentCosts = pred.getCumulativeCostsShare();
 			if (parentCosts != null) {
 				this.cumulativeCosts.addCosts(parentCosts);
 			} else {
@@ -325,10 +325,10 @@ public abstract class PlanNode implements Visitable<PlanNode>, DumpableNode<Plan
 	//                               Input, Predecessors, Successors
 	// --------------------------------------------------------------------------------------------
 	
-	public abstract Iterator<Channel> getInputs();
+	public abstract Iterable<Channel> getInputs();
 	
 	@Override
-	public abstract Iterator<PlanNode> getPredecessors();
+	public abstract Iterable<PlanNode> getPredecessors();
 	
 	/**
 	 * Sets a list of all broadcast inputs attached to this node.
@@ -396,7 +396,7 @@ public abstract class PlanNode implements Visitable<PlanNode>, DumpableNode<Plan
 		}
 		for (FieldSet fields : uniqueFieldCombinations) {
 			this.globalProps.addUniqueFieldCombination(fields);
-			this.localProps.addUniqueFields(fields);
+			this.localProps = this.localProps.addUniqueFields(fields);
 		}
 	}
 
@@ -461,20 +461,22 @@ public abstract class PlanNode implements Visitable<PlanNode>, DumpableNode<Plan
 
 
 	@Override
-	public Iterator<DumpableConnection<PlanNode>> getDumpableInputs() {
+	public Iterable<DumpableConnection<PlanNode>> getDumpableInputs() {
 		List<DumpableConnection<PlanNode>> allInputs = new ArrayList<DumpableConnection<PlanNode>>();
 		
-		for (Iterator<Channel> inputs = getInputs(); inputs.hasNext();) {
-			allInputs.add(inputs.next());
+		for (Channel c : getInputs()) {
+			allInputs.add(c);
 		}
 		
 		for (NamedChannel c : getBroadcastInputs()) {
 			allInputs.add(c);
 		}
 		
-		return allInputs.iterator();
+		return allInputs;
 	}
 	
+	// --------------------------------------------------------------------------------------------
+	
 	public static enum SourceAndDamReport {
 		NOT_FOUND, FOUND_SOURCE, FOUND_SOURCE_AND_DAM;
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ed7c3f15/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/SingleInputPlanNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/SingleInputPlanNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/SingleInputPlanNode.java
index 7b7b429..29e52ab 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/SingleInputPlanNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/SingleInputPlanNode.java
@@ -18,9 +18,8 @@ import static eu.stratosphere.compiler.plan.PlanNode.SourceAndDamReport.FOUND_SO
 import static eu.stratosphere.compiler.plan.PlanNode.SourceAndDamReport.NOT_FOUND;
 
 import java.util.ArrayList;
-import java.util.Iterator;
+import java.util.Collections;
 import java.util.List;
-import java.util.NoSuchElementException;
 
 import eu.stratosphere.api.common.operators.util.FieldList;
 import eu.stratosphere.api.common.typeutils.TypeComparatorFactory;
@@ -150,68 +149,28 @@ public class SingleInputPlanNode extends PlanNode {
 
 
 	@Override
-	public Iterator<PlanNode> getPredecessors() {
+	public Iterable<PlanNode> getPredecessors() {
 		if (getBroadcastInputs() == null || getBroadcastInputs().isEmpty()) {
-			return new Iterator<PlanNode>() {
-				private boolean hasLeft = true;
-				@Override
-				public boolean hasNext() {
-					return this.hasLeft;
-				}
-				@Override
-				public PlanNode next() {
-					if (this.hasLeft) {
-						this.hasLeft = false;
-						return SingleInputPlanNode.this.input.getSource();
-					} else {
-						throw new NoSuchElementException();
-					}
-				}
-				@Override
-				public void remove() {
-					throw new UnsupportedOperationException();
-				}
-			};
+			return Collections.singleton(this.input.getSource());
 		}
 		else {
 			List<PlanNode> preds = new ArrayList<PlanNode>();
-			
 			preds.add(input.getSource());
 			
 			for (Channel c : getBroadcastInputs()) {
 				preds.add(c.getSource());
 			}
 			
-			return preds.iterator();
+			return preds;
 		}
 	}
 
 
 	@Override
-	public Iterator<Channel> getInputs() {
-		return new Iterator<Channel>() {
-			private boolean hasLeft = true;
-			@Override
-			public boolean hasNext() {
-				return this.hasLeft;
-			}
-			@Override
-			public Channel next() {
-				if (this.hasLeft) {
-					this.hasLeft = false;
-					return SingleInputPlanNode.this.input;
-				} else {
-					throw new NoSuchElementException();
-				}
-			}
-			@Override
-			public void remove() {
-				throw new UnsupportedOperationException();
-			}
-		};
+	public Iterable<Channel> getInputs() {
+		return Collections.singleton(this.input);
 	}
 
-
 	@Override
 	public SourceAndDamReport hasDamOnPathDownTo(PlanNode source) {
 		if (source == this) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ed7c3f15/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/SolutionSetPlanNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/SolutionSetPlanNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/SolutionSetPlanNode.java
index 4f47093..ee54c5a 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/SolutionSetPlanNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/SolutionSetPlanNode.java
@@ -19,7 +19,6 @@ import static eu.stratosphere.compiler.plan.PlanNode.SourceAndDamReport.NOT_FOUN
 
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.Iterator;
 
 import eu.stratosphere.compiler.costs.Costs;
 import eu.stratosphere.compiler.dag.OptimizerNode;
@@ -92,14 +91,14 @@ public class SolutionSetPlanNode extends PlanNode {
 
 
 	@Override
-	public Iterator<PlanNode> getPredecessors() {
-		return Collections.<PlanNode>emptyList().iterator();
+	public Iterable<PlanNode> getPredecessors() {
+		return Collections.<PlanNode>emptyList();
 	}
 
 
 	@Override
-	public Iterator<Channel> getInputs() {
-		return Collections.<Channel>emptyList().iterator();
+	public Iterable<Channel> getInputs() {
+		return Collections.<Channel>emptyList();
 	}
 
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ed7c3f15/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/SourcePlanNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/SourcePlanNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/SourcePlanNode.java
index 2e7d86a..b0900d9 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/SourcePlanNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/SourcePlanNode.java
@@ -17,7 +17,6 @@ import static eu.stratosphere.compiler.plan.PlanNode.SourceAndDamReport.FOUND_SO
 import static eu.stratosphere.compiler.plan.PlanNode.SourceAndDamReport.NOT_FOUND;
 
 import java.util.Collections;
-import java.util.Iterator;
 
 import eu.stratosphere.api.common.typeutils.TypeSerializerFactory;
 import eu.stratosphere.compiler.dag.DataSourceNode;
@@ -82,14 +81,14 @@ public class SourcePlanNode extends PlanNode {
 
 
 	@Override
-	public Iterator<PlanNode> getPredecessors() {
-		return Collections.<PlanNode>emptyList().iterator();
+	public Iterable<PlanNode> getPredecessors() {
+		return Collections.<PlanNode>emptyList();
 	}
 
 
 	@Override
-	public Iterator<Channel> getInputs() {
-		return Collections.<Channel>emptyList().iterator();
+	public Iterable<Channel> getInputs() {
+		return Collections.<Channel>emptyList();
 	}
 
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ed7c3f15/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/WorksetPlanNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/WorksetPlanNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/WorksetPlanNode.java
index 851e269..936eb67 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/WorksetPlanNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/WorksetPlanNode.java
@@ -19,7 +19,6 @@ import static eu.stratosphere.compiler.plan.PlanNode.SourceAndDamReport.NOT_FOUN
 
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.Iterator;
 
 import eu.stratosphere.compiler.costs.Costs;
 import eu.stratosphere.compiler.dag.OptimizerNode;
@@ -93,14 +92,14 @@ public class WorksetPlanNode extends PlanNode {
 
 
 	@Override
-	public Iterator<PlanNode> getPredecessors() {
-		return Collections.<PlanNode>emptyList().iterator();
+	public Iterable<PlanNode> getPredecessors() {
+		return Collections.<PlanNode>emptyList();
 	}
 
 
 	@Override
-	public Iterator<Channel> getInputs() {
-		return Collections.<Channel>emptyList().iterator();
+	public Iterable<Channel> getInputs() {
+		return Collections.<Channel>emptyList();
 	}
 
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ed7c3f15/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plandump/DumpableNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plandump/DumpableNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plandump/DumpableNode.java
index 5ff3a8a..56f14ee 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plandump/DumpableNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plandump/DumpableNode.java
@@ -12,8 +12,6 @@
  **********************************************************************************************************************/
 package eu.stratosphere.compiler.plandump;
 
-import java.util.Iterator;
-
 import eu.stratosphere.compiler.dag.OptimizerNode;
 import eu.stratosphere.compiler.plan.PlanNode;
 
@@ -27,9 +25,9 @@ public interface DumpableNode<T extends DumpableNode<T>> {
 	 * 
 	 * @return An iterator over the predecessors.
 	 */
-	Iterator<T> getPredecessors();
+	Iterable<T> getPredecessors();
 	
-	Iterator<DumpableConnection<T>> getDumpableInputs();
+	Iterable<DumpableConnection<T>> getDumpableInputs();
 	
 	OptimizerNode getOptimizerNode();
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ed7c3f15/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plandump/PlanJSONDumpGenerator.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plandump/PlanJSONDumpGenerator.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plandump/PlanJSONDumpGenerator.java
index e89a18b..83cb502 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plandump/PlanJSONDumpGenerator.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plandump/PlanJSONDumpGenerator.java
@@ -142,8 +142,7 @@ public class PlanJSONDumpGenerator {
 		this.nodeIds.put(node, this.nodeCnt++);
 		
 		// then recurse
-		for (Iterator<? extends DumpableNode<?>> children = node.getPredecessors(); children.hasNext(); ) {
-			final DumpableNode<?> child = children.next();
+		for (DumpableNode<?> child : node.getPredecessors()) {
 			visit(child, writer, first);
 			first = false;
 		}
@@ -254,7 +253,7 @@ public class PlanJSONDumpGenerator {
 				+ (n.getSubtasksPerInstance() >= 1 ? n.getSubtasksPerInstance() : "default") + "\"");
 
 		// output node predecessors
-		Iterator<? extends DumpableConnection<?>> inConns = node.getDumpableInputs();
+		Iterator<? extends DumpableConnection<?>> inConns = node.getDumpableInputs().iterator();
 		String child1name = "", child2name = "";
 
 		if (inConns != null && inConns.hasNext()) {
@@ -270,8 +269,8 @@ public class PlanJSONDumpGenerator {
 				if (conn.getSource() instanceof NAryUnionPlanNode) {
 					inConnsForInput = new ArrayList<DumpableConnection<?>>();
 					
-					for (Iterator<? extends DumpableConnection<?>> inputOfUnion = conn.getSource().getDumpableInputs(); inputOfUnion.hasNext();) {
-						inConnsForInput.add(inputOfUnion.next());
+					for (DumpableConnection<?> inputOfUnion : conn.getSource().getDumpableInputs()) {
+						inConnsForInput.add(inputOfUnion);
 					}
 				}
 				else {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ed7c3f15/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java
index aebf2cf..5a30fb6 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java
@@ -455,7 +455,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 				final TaskInChain chainedTask;
 				if ((chainedTask = this.chainedTasks.get(node)) != null) {
 					// Chained Task. Sanity check first...
-					final Iterator<Channel> inConns = node.getInputs();
+					final Iterator<Channel> inConns = node.getInputs().iterator();
 					if (!inConns.hasNext()) {
 						throw new CompilerException("Bug: Found chained task with no input.");
 					}
@@ -522,7 +522,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 			// enclosing iteration node, because the inputs are the initial inputs to the iteration.
 			final Iterator<Channel> inConns;
 			if (node instanceof BulkPartialSolutionPlanNode) {
-				inConns = ((BulkPartialSolutionPlanNode) node).getContainingIterationNode().getInputs();
+				inConns = ((BulkPartialSolutionPlanNode) node).getContainingIterationNode().getInputs().iterator();
 				// because the partial solution has its own vertex, is has only one (logical) input.
 				// note this in the task configuration
 				targetVertexConfig.setIterationHeadPartialSolutionOrWorksetInputIndex(0);
@@ -536,7 +536,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 				targetVertexConfig.setIterationHeadPartialSolutionOrWorksetInputIndex(0);
 				targetVertexConfig.setIterationHeadSolutionSetInputIndex(1);
 			} else {
-				inConns = node.getInputs();
+				inConns = node.getInputs().iterator();
 			}
 			if (!inConns.hasNext()) {
 				throw new CompilerException("Bug: Found a non-source task with no input.");
@@ -578,7 +578,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 				
 				// check if the iteration's input is a union
 				if (iterationNode.getInput().getSource() instanceof NAryUnionPlanNode) {
-					allInChannels = ((NAryUnionPlanNode) iterationNode.getInput().getSource()).getInputs();
+					allInChannels = ((NAryUnionPlanNode) iterationNode.getInput().getSource()).getInputs().iterator();
 				} else {
 					allInChannels = Collections.singletonList(iterationNode.getInput()).iterator();
 				}
@@ -597,7 +597,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 				
 				// check if the iteration's input is a union
 				if (iterationNode.getInput2().getSource() instanceof NAryUnionPlanNode) {
-					allInChannels = ((NAryUnionPlanNode) iterationNode.getInput2().getSource()).getInputs();
+					allInChannels = ((NAryUnionPlanNode) iterationNode.getInput2().getSource()).getInputs().iterator();
 				} else {
 					allInChannels = Collections.singletonList(iterationNode.getInput2()).iterator();
 				}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ed7c3f15/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/postpass/GenericFlatTypePostPass.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/postpass/GenericFlatTypePostPass.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/postpass/GenericFlatTypePostPass.java
index b39a297..38dd7ec 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/postpass/GenericFlatTypePostPass.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/postpass/GenericFlatTypePostPass.java
@@ -13,7 +13,6 @@
 
 package eu.stratosphere.compiler.postpass;
 
-import java.util.Iterator;
 import java.util.Map;
 
 import eu.stratosphere.api.common.operators.util.FieldList;
@@ -415,8 +414,8 @@ public abstract class GenericFlatTypePostPass<X, T extends AbstractSchema<X>> im
 		else if (node instanceof NAryUnionPlanNode) {
 			// only propagate the info down
 			try {
-				for (Iterator<Channel> channels = node.getInputs(); channels.hasNext(); ) {
-					propagateToChannel(parentSchema, channels.next(), createUtilities);
+				for (Channel channel : node.getInputs()) {
+					propagateToChannel(parentSchema, channel, createUtilities);
 				}
 			} catch (MissingFieldTypeInfoException ex) {
 				throw new CompilerPostPassException("Could not set up runtime strategy for the input channel to " +

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ed7c3f15/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/postpass/JavaApiPostPass.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/postpass/JavaApiPostPass.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/postpass/JavaApiPostPass.java
index 635c48d..24fd8e5 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/postpass/JavaApiPostPass.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/postpass/JavaApiPostPass.java
@@ -16,7 +16,6 @@ package eu.stratosphere.compiler.postpass;
 
 import java.util.Arrays;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.Set;
 
 import eu.stratosphere.api.common.operators.DualInputOperator;
@@ -210,8 +209,8 @@ public class JavaApiPostPass implements OptimizerPostPass {
 		}
 		else if (node instanceof NAryUnionPlanNode){
 			// Traverse to all child channels
-			for (Iterator<Channel> channels = node.getInputs(); channels.hasNext(); ) {
-				traverseChannel(channels.next());
+			for (Channel channel : node.getInputs()) {
+				traverseChannel(channel);
 			}
 		}
 		else {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ed7c3f15/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/CoGroupSolutionSetFirstTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/CoGroupSolutionSetFirstTest.java b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/CoGroupSolutionSetFirstTest.java
index 24d0493..c2b377e 100644
--- a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/CoGroupSolutionSetFirstTest.java
+++ b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/CoGroupSolutionSetFirstTest.java
@@ -81,7 +81,7 @@ public class CoGroupSolutionSetFirstTest extends CompilerTestBase {
 					PlanNode deltaNode = ((WorksetIterationPlanNode) visitable).getSolutionSetDeltaPlanNode();
 
 					//get the CoGroup
-					DualInputPlanNode dpn = (DualInputPlanNode) deltaNode.getInputs().next().getSource();
+					DualInputPlanNode dpn = (DualInputPlanNode) deltaNode.getInputs().iterator().next().getSource();
 					Channel in1 = dpn.getInput1();
 					Channel in2 = dpn.getInput2();
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ed7c3f15/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/UnionPropertyPropagationTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/UnionPropertyPropagationTest.java b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/UnionPropertyPropagationTest.java
index b7b2d5c..bcbb6b1 100644
--- a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/UnionPropertyPropagationTest.java
+++ b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/UnionPropertyPropagationTest.java
@@ -83,8 +83,7 @@ public class UnionPropertyPropagationTest extends CompilerTestBase {
 			@Override
 			public boolean preVisit(PlanNode visitable) {
 				if (visitable instanceof SingleInputPlanNode && visitable.getPactContract() instanceof ReduceOperator) {
-					for (Iterator<Channel> inputs = visitable.getInputs(); inputs.hasNext();) {
-						final Channel inConn = inputs.next();
+					for (Channel inConn : visitable.getInputs()) {
 						Assert.assertTrue("Reduce should just forward the input if it is already partitioned",
 								inConn.getShipStrategy() == ShipStrategyType.FORWARD); 
 					}
@@ -148,7 +147,7 @@ public class UnionPropertyPropagationTest extends CompilerTestBase {
 				 */
 				if (visitable instanceof NAryUnionPlanNode) {
 					int numberInputs = 0;
-					for (Iterator<Channel> inputs = visitable.getInputs(); inputs.hasNext(); numberInputs++) {
+					for (Iterator<Channel> inputs = visitable.getInputs().iterator(); inputs.hasNext(); numberInputs++) {
 						final Channel inConn = inputs.next();
 						PlanNode inNode = inConn.getSource();
 						Assert.assertTrue("Input of Union should be FlatMapOperators",

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ed7c3f15/stratosphere-core/src/main/java/eu/stratosphere/util/IterableIterator.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/util/IterableIterator.java b/stratosphere-core/src/main/java/eu/stratosphere/util/IterableIterator.java
new file mode 100644
index 0000000..16f610a
--- /dev/null
+++ b/stratosphere-core/src/main/java/eu/stratosphere/util/IterableIterator.java
@@ -0,0 +1,26 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.util;
+
+import java.util.Iterator;
+
+/**
+ * An {@link Iterator] that is also {@link Iterable} (often by returning itself).
+ * 
+ * @param <T> The iterated elements' type.
+ */
+public interface IterableIterator<E> extends Iterator<E>, Iterable<E> {
+}