You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/06/17 02:09:12 UTC

[2/6] git commit: [FLINK-935] (continued) Correct feedback property check for iterative algorithms.

[FLINK-935] (continued) Correct feedback property check for iterative algorithms.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/ca2b287a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/ca2b287a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/ca2b287a

Branch: refs/heads/master
Commit: ca2b287a7a78328ebf43766b9fdf39b56fb5fd4f
Parents: ed7c3f1
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Jun 16 15:05:10 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jun 17 02:07:37 2014 +0200

----------------------------------------------------------------------
 .../api/avro/AvroOutputFormatTest.java          |    1 +
 .../compiler/dag/BulkIterationNode.java         |   61 +-
 .../stratosphere/compiler/dag/DataSinkNode.java |    2 +
 .../compiler/dag/SingleInputNode.java           |    2 +
 .../stratosphere/compiler/dag/TwoInputNode.java |   17 +-
 .../compiler/dag/WorksetIterationNode.java      |   55 +-
 .../dataproperties/LocalProperties.java         |    2 +-
 .../RequestedGlobalProperties.java              |    2 -
 .../RequestedLocalProperties.java               |    8 +-
 .../eu/stratosphere/compiler/plan/PlanNode.java |  103 +-
 .../compiler/CoGroupSolutionSetFirstTest.java   |    1 -
 .../compiler/FeedbackPropertiesMatchTest.java   | 1432 ++++++++++++++++++
 .../pact/compiler/IterationsCompilerTest.java   |    3 -
 .../testfunctions/DummyJoinFunction.java        |   28 +
 .../example/java/graph/PageRankBasic.java       |    8 +-
 .../pact/runtime/cache/FileCache.java           |    9 +-
 .../pact/runtime/shipping/ShipStrategyType.java |    2 +-
 .../netty/NettyConnectionManagerTest.java       |    2 +-
 .../iterations/ConnectedComponentsTest.java     |    4 +-
 .../iterations/PageRankCompilerTest.java        |  108 ++
 20 files changed, 1807 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca2b287a/stratosphere-addons/avro/src/test/java/eu/stratosphere/api/avro/AvroOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-addons/avro/src/test/java/eu/stratosphere/api/avro/AvroOutputFormatTest.java b/stratosphere-addons/avro/src/test/java/eu/stratosphere/api/avro/AvroOutputFormatTest.java
index b0e4b6e..20610a2 100644
--- a/stratosphere-addons/avro/src/test/java/eu/stratosphere/api/avro/AvroOutputFormatTest.java
+++ b/stratosphere-addons/avro/src/test/java/eu/stratosphere/api/avro/AvroOutputFormatTest.java
@@ -32,6 +32,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 
+@SuppressWarnings("serial")
 public class AvroOutputFormatTest extends JavaProgramTestBase {
 
 	public static String outputPath1;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca2b287a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkIterationNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkIterationNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkIterationNode.java
index b60f427..f6720ea 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkIterationNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkIterationNode.java
@@ -13,12 +13,14 @@
 
 package eu.stratosphere.compiler.dag;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 
 import eu.stratosphere.api.common.operators.base.BulkIterationBase;
+import eu.stratosphere.api.common.operators.util.FieldList;
 import eu.stratosphere.compiler.CompilerException;
 import eu.stratosphere.compiler.DataStatistics;
 import eu.stratosphere.compiler.PactCompiler.InterestingPropertyVisitor;
@@ -36,6 +38,9 @@ import eu.stratosphere.compiler.plan.BulkPartialSolutionPlanNode;
 import eu.stratosphere.compiler.plan.Channel;
 import eu.stratosphere.compiler.plan.NamedChannel;
 import eu.stratosphere.compiler.plan.PlanNode;
+import eu.stratosphere.compiler.plan.SingleInputPlanNode;
+import eu.stratosphere.compiler.plan.PlanNode.FeedbackPropertiesMeetRequirementsReport;
+import eu.stratosphere.pact.runtime.task.DriverStrategy;
 import eu.stratosphere.util.Visitor;
 
 /**
@@ -271,6 +276,9 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode
 		// 1) Because we enumerate multiple times, we may need to clean the cached plans
 		//    before starting another enumeration
 		this.nextPartialSolution.accept(PlanCacheCleaner.INSTANCE);
+		if (this.terminationCriterion != null) {
+			this.terminationCriterion.accept(PlanCacheCleaner.INSTANCE);
+		}
 		
 		// 2) Give the partial solution the properties of the current candidate for the initial partial solution
 		this.partialSolution.setCandidateProperties(in.getGlobalProperties(), in.getLocalProperties(), in);
@@ -282,14 +290,51 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode
 		// 4) Make sure that the beginning of the step function does not assume properties that 
 		//    are not also produced by the end of the step function.
 
-		for (Iterator<PlanNode> planDeleter = candidates.iterator(); planDeleter.hasNext(); ) {
-			PlanNode candidate = planDeleter.next();
+		{
+			List<PlanNode> newCandidates = new ArrayList<PlanNode>();
 			
-			// quick-check if the properties at the end of the step function are the same as at the beginning
-			if (candidate.getGlobalProperties().equals(pspn.getGlobalProperties()) && candidate.getLocalProperties().equals(pspn.getLocalProperties())) {
-				continue;
+			for (Iterator<PlanNode> planDeleter = candidates.iterator(); planDeleter.hasNext(); ) {
+				PlanNode candidate = planDeleter.next();
+				
+				GlobalProperties atEndGlobal = candidate.getGlobalProperties();
+				LocalProperties atEndLocal = candidate.getLocalProperties();
+				
+				FeedbackPropertiesMeetRequirementsReport report = candidate.checkPartialSolutionPropertiesMet(pspn, atEndGlobal, atEndLocal);
+				if (report == FeedbackPropertiesMeetRequirementsReport.NO_PARTIAL_SOLUTION) {
+					; // depends only through broadcast variable on the partial solution
+				}
+				else if (report == FeedbackPropertiesMeetRequirementsReport.NOT_MET) {
+					// attach a no-op node through which we create the properties of the original input
+					Channel toNoOp = new Channel(candidate);
+					globPropsReq.parameterizeChannel(toNoOp, false, false);
+					locPropsReq.parameterizeChannel(toNoOp);
+					
+					UnaryOperatorNode rebuildPropertiesNode = new UnaryOperatorNode("Rebuild Partial Solution Properties", FieldList.EMPTY_LIST);
+					rebuildPropertiesNode.setDegreeOfParallelism(candidate.getDegreeOfParallelism());
+					rebuildPropertiesNode.setSubtasksPerInstance(candidate.getSubtasksPerInstance());
+					
+					SingleInputPlanNode rebuildPropertiesPlanNode = new SingleInputPlanNode(rebuildPropertiesNode, "Rebuild Partial Solution Properties", toNoOp, DriverStrategy.UNARY_NO_OP);
+					rebuildPropertiesPlanNode.initProperties(toNoOp.getGlobalProperties(), toNoOp.getLocalProperties());
+					estimator.costOperator(rebuildPropertiesPlanNode);
+						
+					GlobalProperties atEndGlobalModified = rebuildPropertiesPlanNode.getGlobalProperties();
+					LocalProperties atEndLocalModified = rebuildPropertiesPlanNode.getLocalProperties();
+						
+					if (!(atEndGlobalModified.equals(atEndGlobal) && atEndLocalModified.equals(atEndLocal))) {
+						FeedbackPropertiesMeetRequirementsReport report2 = candidate.checkPartialSolutionPropertiesMet(pspn, atEndGlobalModified, atEndLocalModified);
+						
+						if (report2 != FeedbackPropertiesMeetRequirementsReport.NOT_MET) {
+							newCandidates.add(rebuildPropertiesPlanNode);
+						}
+					}
+					
+					planDeleter.remove();
+				}
 			}
-			planDeleter.remove();
+		}
+		
+		if (candidates.isEmpty()) {
+			return;
 		}
 		
 		// 5) Create a candidate for the Iteration Node for every remaining plan of the step function.
@@ -302,13 +347,13 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode
 				target.add(node);
 			}
 		}
-		else if(candidates.size() > 0) {
+		else if (candidates.size() > 0) {
 			List<PlanNode> terminationCriterionCandidates = this.terminationCriterion.getAlternativePlans(estimator);
 
 			SingleRootJoiner singleRoot = (SingleRootJoiner) this.singleRoot;
 			
 			for (PlanNode candidate : candidates) {
-				for(PlanNode terminationCandidate : terminationCriterionCandidates) {
+				for (PlanNode terminationCandidate : terminationCriterionCandidates) {
 					if (singleRoot.areBranchCompatible(candidate, terminationCandidate)) {
 						BulkIterationPlanNode node = new BulkIterationPlanNode(this, "BulkIteration ("+this.getPactContract().getName()+")", in, pspn, candidate, terminationCandidate);
 						GlobalProperties gProps = candidate.getGlobalProperties().clone();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca2b287a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSinkNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSinkNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSinkNode.java
index 15c7670..fe823d2 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSinkNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSinkNode.java
@@ -210,6 +210,8 @@ public class DataSinkNode extends OptimizerNode {
 					Channel c = new Channel(p);
 					gp.parameterizeChannel(c, globalDopChange, localDopChange);
 					lp.parameterizeChannel(c);
+					c.setRequiredLocalProps(lp);
+					c.setRequiredGlobalProps(gp);
 					
 					// no need to check whether the created properties meet what we need in case
 					// of ordering or global ordering, because the only interesting properties we have

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca2b287a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java
index fd4b76a..e1727f9 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java
@@ -312,6 +312,7 @@ public abstract class SingleInputNode extends OptimizerNode {
 					// requested properties
 					for (RequestedGlobalProperties rgps: allValidGlobals) {
 						if (rgps.isMetBy(c.getGlobalProperties())) {
+							c.setRequiredGlobalProps(rgps);
 							addLocalCandidates(c, broadcastPlanChannels, igps, outputPlans, estimator);
 							break;
 						}
@@ -365,6 +366,7 @@ public abstract class SingleInputNode extends OptimizerNode {
 			for (OperatorDescriptorSingle dps: getPossibleProperties()) {
 				for (RequestedLocalProperties ilps : dps.getPossibleLocalProperties()) {
 					if (ilps.isMetBy(in.getLocalProperties())) {
+						in.setRequiredLocalProps(ilps);
 						instantiateCandidate(dps, in, broadcastPlanChannels, target, estimator, rgps, ilp);
 						break outer;
 					}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca2b287a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java
index cccbeba..9898c81 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java
@@ -447,9 +447,13 @@ public abstract class TwoInputNode extends OptimizerNode {
 							if (gpp.getProperties1().isMetBy(c1.getGlobalProperties()) && 
 								gpp.getProperties2().isMetBy(c2.getGlobalProperties()) )
 							{
+								Channel c1Clone = c1.clone();
+								c1Clone.setRequiredGlobalProps(gpp.getProperties1());
+								c2.setRequiredGlobalProps(gpp.getProperties2());
+								
 								// we form a valid combination, so create the local candidates
 								// for this
-								addLocalCandidates(c1, c2, broadcastPlanChannels, igps1, igps2, outputPlans, allLocalPairs, estimator);
+								addLocalCandidates(c1Clone, c2, broadcastPlanChannels, igps1, igps2, outputPlans, allLocalPairs, estimator);
 								break;
 							}
 						}
@@ -495,7 +499,6 @@ public abstract class TwoInputNode extends OptimizerNode {
 				final Channel in2 = template2.clone();
 				ilp2.parameterizeChannel(in2);
 				
-				allPossibleLoop:
 				for (OperatorDescriptorDual dps: this.possibleProperties) {
 					for (LocalPropertiesPair lpp : dps.getPossibleLocalProperties()) {
 						if (lpp.getProperties1().isMetBy(in1.getLocalProperties()) &&
@@ -507,12 +510,16 @@ public abstract class TwoInputNode extends OptimizerNode {
 							if (dps.areCoFulfilled(lpp.getProperties1(), lpp.getProperties2(), 
 								in1.getLocalProperties(), in2.getLocalProperties()))
 							{
+								Channel in1Copy = in1.clone();
+								in1Copy.setRequiredLocalProps(lpp.getProperties1());
+								in2.setRequiredLocalProps(lpp.getProperties2());
+								
 								// all right, co compatible
-								instantiate(dps, in1, in2, broadcastPlanChannels, target, estimator, rgps1, rgps2, ilp1, ilp2);
-								break allPossibleLoop;
+								instantiate(dps, in1Copy, in2, broadcastPlanChannels, target, estimator, rgps1, rgps2, ilp1, ilp2);
+								break;
 							} else {
 								// meet, but not co-compatible
-								throw new CompilerException("Implements to adjust one side to the other!");
+//								throw new CompilerException("Implements to adjust one side to the other!");
 							}
 						}
 					}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca2b287a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java
index 76c4402..2c70794 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java
@@ -41,6 +41,7 @@ import eu.stratosphere.compiler.plan.SingleInputPlanNode;
 import eu.stratosphere.compiler.plan.SolutionSetPlanNode;
 import eu.stratosphere.compiler.plan.WorksetIterationPlanNode;
 import eu.stratosphere.compiler.plan.WorksetPlanNode;
+import eu.stratosphere.compiler.plan.PlanNode.FeedbackPropertiesMeetRequirementsReport;
 import eu.stratosphere.compiler.util.NoOpBinaryUdfOp;
 import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
 import eu.stratosphere.pact.runtime.task.DriverStrategy;
@@ -279,7 +280,7 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode
 	@Override
 	protected void instantiate(OperatorDescriptorDual operator, Channel solutionSetIn, Channel worksetIn,
 			List<Set<? extends NamedChannel>> broadcastPlanChannels, List<PlanNode> target, CostEstimator estimator,
-			RequestedGlobalProperties globPropsReqSolutionSet,RequestedGlobalProperties globPropsReqWorkset,
+			RequestedGlobalProperties globPropsReqSolutionSet, RequestedGlobalProperties globPropsReqWorkset,
 			RequestedLocalProperties locPropsReqSolutionSet, RequestedLocalProperties locPropsReqWorkset)
 	{
 		// check for pipeline breaking using hash join with build on the solution set side
@@ -314,12 +315,54 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode
 		//    initial partial solution
 		
 		// Make sure that the workset candidates fulfill the input requirements
-		for (Iterator<PlanNode> planDeleter = worksetCandidates.iterator(); planDeleter.hasNext(); ) {
-			PlanNode candidate = planDeleter.next();
-			if (!(globPropsReqWorkset.isMetBy(candidate.getGlobalProperties()) && locPropsReqWorkset.isMetBy(candidate.getLocalProperties()))) {
-				planDeleter.remove();
+		{
+			List<PlanNode> newCandidates = new ArrayList<PlanNode>();
+			
+			for (Iterator<PlanNode> planDeleter = worksetCandidates.iterator(); planDeleter.hasNext(); ) {
+				PlanNode candidate = planDeleter.next();
+				
+				GlobalProperties atEndGlobal = candidate.getGlobalProperties();
+				LocalProperties atEndLocal = candidate.getLocalProperties();
+				
+				FeedbackPropertiesMeetRequirementsReport report = candidate.checkPartialSolutionPropertiesMet(wspn, atEndGlobal, atEndLocal);
+				if (report == FeedbackPropertiesMeetRequirementsReport.NO_PARTIAL_SOLUTION) {
+					; // depends only through broadcast variable on the workset solution
+				}
+				else if (report == FeedbackPropertiesMeetRequirementsReport.NOT_MET) {
+					// attach a no-op node through which we create the properties of the original input
+					Channel toNoOp = new Channel(candidate);
+					globPropsReqWorkset.parameterizeChannel(toNoOp, false, false);
+					locPropsReqWorkset.parameterizeChannel(toNoOp);
+					
+					UnaryOperatorNode rebuildWorksetPropertiesNode = new UnaryOperatorNode("Rebuild Workset Properties", FieldList.EMPTY_LIST);
+					
+					rebuildWorksetPropertiesNode.setDegreeOfParallelism(candidate.getDegreeOfParallelism());
+					rebuildWorksetPropertiesNode.setSubtasksPerInstance(candidate.getSubtasksPerInstance());
+					
+					SingleInputPlanNode rebuildWorksetPropertiesPlanNode = new SingleInputPlanNode(rebuildWorksetPropertiesNode, "Rebuild Workset Properties", toNoOp, DriverStrategy.UNARY_NO_OP);
+					rebuildWorksetPropertiesPlanNode.initProperties(toNoOp.getGlobalProperties(), toNoOp.getLocalProperties());
+					estimator.costOperator(rebuildWorksetPropertiesPlanNode);
+						
+					GlobalProperties atEndGlobalModified = rebuildWorksetPropertiesPlanNode.getGlobalProperties();
+					LocalProperties atEndLocalModified = rebuildWorksetPropertiesPlanNode.getLocalProperties();
+						
+					if (!(atEndGlobalModified.equals(atEndGlobal) && atEndLocalModified.equals(atEndLocal))) {
+						FeedbackPropertiesMeetRequirementsReport report2 = candidate.checkPartialSolutionPropertiesMet(wspn, atEndGlobalModified, atEndLocalModified);
+						
+						if (report2 != FeedbackPropertiesMeetRequirementsReport.NOT_MET) {
+							newCandidates.add(rebuildWorksetPropertiesPlanNode);
+						}
+					}
+					
+					// remove the original operator and add the modified candidate
+					planDeleter.remove();
+					
+				}
 			}
+			
+			worksetCandidates.addAll(newCandidates);
 		}
+		
 		if (worksetCandidates.isEmpty()) {
 			return;
 		}
@@ -342,7 +385,7 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode
 		gp.setHashPartitioned(this.solutionSetKeyFields);
 		gp.addUniqueFieldCombination(this.solutionSetKeyFields);
 		
-		LocalProperties lp = LocalProperties.TRIVIAL.addUniqueFields(this.solutionSetKeyFields);
+		LocalProperties lp = LocalProperties.EMPTY.addUniqueFields(this.solutionSetKeyFields);
 		
 		// take all combinations of solution set delta and workset plans
 		for (PlanNode solutionSetCandidate : solutionSetDeltaCandidates) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca2b287a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/LocalProperties.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/LocalProperties.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/LocalProperties.java
index b45e341..f49c130 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/LocalProperties.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/LocalProperties.java
@@ -27,7 +27,7 @@ import eu.stratosphere.compiler.dag.OptimizerNode;
  */
 public class LocalProperties implements Cloneable {
 	
-	public static final LocalProperties TRIVIAL = new LocalProperties();
+	public static final LocalProperties EMPTY = new LocalProperties();
 	
 	// --------------------------------------------------------------------------------------------
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca2b287a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedGlobalProperties.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedGlobalProperties.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedGlobalProperties.java
index 3935f5e..574922a 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedGlobalProperties.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedGlobalProperties.java
@@ -229,8 +229,6 @@ public final class RequestedGlobalProperties implements Cloneable {
 		final GlobalProperties inGlobals = channel.getSource().getGlobalProperties();
 		// if we have no global parallelism change, check if we have already compatible global properties
 		if (!globalDopChange && !localDopChange && isMetBy(inGlobals)) {
-			channel.setRequiredGlobalProps(this);
-			
 			// we meet already everything, so go forward
 			channel.setShipStrategy(ShipStrategyType.FORWARD);
 			return;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca2b287a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedLocalProperties.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedLocalProperties.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedLocalProperties.java
index e714cea..aeae0d2 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedLocalProperties.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedLocalProperties.java
@@ -33,9 +33,9 @@ public class RequestedLocalProperties implements Cloneable {
 	
 	// --------------------------------------------------------------------------------------------
 	
-	Ordering ordering;			// order inside a partition, null if not ordered
+	private Ordering ordering;			// order inside a partition, null if not ordered
 
-	FieldSet groupedFields;		// fields by which the stream is grouped. null if not grouped.
+	private FieldSet groupedFields;		// fields by which the stream is grouped. null if not grouped.
 
 	// --------------------------------------------------------------------------------------------
 	
@@ -190,8 +190,8 @@ public class RequestedLocalProperties implements Cloneable {
 		LocalProperties current = channel.getLocalProperties();
 		
 		if (isMetBy(current)) {
-			// we are met. record that this is needed.
-			channel.setRequiredLocalProps(this);
+			// we are met, all is good
+			channel.setLocalStrategy(LocalStrategy.NONE);
 		}
 		else if (this.ordering != null) {
 			channel.setLocalStrategy(LocalStrategy.SORT, this.ordering.getInvolvedIndexes(), this.ordering.getFieldSortDirections());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca2b287a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/PlanNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/PlanNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/PlanNode.java
index 16241e4..69263bc 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/PlanNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/PlanNode.java
@@ -29,7 +29,9 @@ import eu.stratosphere.compiler.dataproperties.GlobalProperties;
 import eu.stratosphere.compiler.dataproperties.LocalProperties;
 import eu.stratosphere.compiler.plandump.DumpableConnection;
 import eu.stratosphere.compiler.plandump.DumpableNode;
+import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
 import eu.stratosphere.pact.runtime.task.DriverStrategy;
+import eu.stratosphere.pact.runtime.task.util.LocalStrategy;
 import eu.stratosphere.util.Visitable;
 
 /**
@@ -434,8 +436,85 @@ public abstract class PlanNode implements Visitable<PlanNode>, DumpableNode<Plan
 	
 	// --------------------------------------------------------------------------------------------
 	
+	/**
+	 * Checks whether this node has a dam on the way down to the given source node. This method
+	 * returns either that (a) the source node is not found as a (transitive) child of this node,
+	 * (b) the node is found, but no dam is on the path, or (c) the node is found and a dam is on
+	 * the path.
+	 * 
+	 * @param source The node on the path to which the dam is sought.
+	 * @return The result whether the node is found and whether a dam is on the path.
+	 */
 	public abstract SourceAndDamReport hasDamOnPathDownTo(PlanNode source);
 	
+	public FeedbackPropertiesMeetRequirementsReport checkPartialSolutionPropertiesMet(PlanNode partialSolution, GlobalProperties feedbackGlobal, LocalProperties feedbackLocal) {
+		if (this == partialSolution) {
+			return FeedbackPropertiesMeetRequirementsReport.PENDING;
+		}
+		
+		boolean found = false;
+		boolean allMet = true;
+		boolean allLocallyMet = true;
+		
+		for (Channel input : getInputs()) {
+			FeedbackPropertiesMeetRequirementsReport inputState = input.getSource().checkPartialSolutionPropertiesMet(partialSolution, feedbackGlobal, feedbackLocal);
+			
+			if (inputState == FeedbackPropertiesMeetRequirementsReport.NO_PARTIAL_SOLUTION) {
+				continue;
+			}
+			else if (inputState == FeedbackPropertiesMeetRequirementsReport.MET) {
+				found = true;
+				continue;
+			}
+			else if (inputState == FeedbackPropertiesMeetRequirementsReport.NOT_MET) {
+				return FeedbackPropertiesMeetRequirementsReport.NOT_MET;
+			}
+			else {
+				found = true;
+				
+				// the partial solution was on the path here. check whether the channel requires
+				// certain properties that are met, or whether the channel introduces new properties
+				
+				// if the plan introduces new global properties, then we can stop looking whether
+				// the feedback properties are sufficient to meet the requirements
+				if (input.getShipStrategy() != ShipStrategyType.FORWARD && input.getShipStrategy() != ShipStrategyType.NONE) {
+					continue;
+				}
+				
+				// first check whether this channel requires something that is not met
+				if (input.getRequiredGlobalProps() != null && !input.getRequiredGlobalProps().isMetBy(feedbackGlobal)) {
+					return FeedbackPropertiesMeetRequirementsReport.NOT_MET;
+				}
+				
+				// in general, not everything is met here already
+				allMet = false;
+				
+				// if the plan introduces new local properties, we can stop checking for matching local properties
+				if (inputState != FeedbackPropertiesMeetRequirementsReport.PENDING_LOCAL_MET) {
+					
+					if (input.getLocalStrategy() == LocalStrategy.NONE) {
+						
+						if (input.getRequiredLocalProps() != null && !input.getRequiredLocalProps().isMetBy(feedbackLocal)) {
+							return FeedbackPropertiesMeetRequirementsReport.NOT_MET;
+						}
+						
+						allLocallyMet = false;
+					}
+				}
+			}
+		}
+		
+		if (!found) {
+			return FeedbackPropertiesMeetRequirementsReport.NO_PARTIAL_SOLUTION;
+		} else if (allMet) {
+			return FeedbackPropertiesMeetRequirementsReport.MET;
+		} else if (allLocallyMet) {
+			return FeedbackPropertiesMeetRequirementsReport.PENDING_LOCAL_MET;
+		} else {
+			return FeedbackPropertiesMeetRequirementsReport.PENDING;
+		}
+	}
+	
 	// --------------------------------------------------------------------------------------------
 	
 
@@ -447,19 +526,16 @@ public abstract class PlanNode implements Visitable<PlanNode>, DumpableNode<Plan
 	
 	// --------------------------------------------------------------------------------------------
 	
-
 	@Override
 	public OptimizerNode getOptimizerNode() {
 		return this.template;
 	}
 
-
 	@Override
 	public PlanNode getPlanNode() {
 		return this;
 	}
 
-
 	@Override
 	public Iterable<DumpableConnection<PlanNode>> getDumpableInputs() {
 		List<DumpableConnection<PlanNode>> allInputs = new ArrayList<DumpableConnection<PlanNode>>();
@@ -480,4 +556,25 @@ public abstract class PlanNode implements Visitable<PlanNode>, DumpableNode<Plan
 	public static enum SourceAndDamReport {
 		NOT_FOUND, FOUND_SOURCE, FOUND_SOURCE_AND_DAM;
 	}
+	
+	
+	
+	public static enum FeedbackPropertiesMeetRequirementsReport {
+		/** Indicates that the path is irrelevant */
+		NO_PARTIAL_SOLUTION,
+		
+		/** Indicates that the question whether the properties are met has been determined pending
+		 * dependent on global and local properties */
+		PENDING,
+		
+		/** Indicates that the question whether the properties are met has been determined pending
+		 * dependent on global properties only */
+		PENDING_LOCAL_MET,
+		
+		/** Indicates that the question whether the properties are met has been determined true */
+		MET,
+		
+		/** Indicates that the question whether the properties are met has been determined false */
+		NOT_MET;
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca2b287a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/CoGroupSolutionSetFirstTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/CoGroupSolutionSetFirstTest.java b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/CoGroupSolutionSetFirstTest.java
index c2b377e..1c99558 100644
--- a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/CoGroupSolutionSetFirstTest.java
+++ b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/CoGroupSolutionSetFirstTest.java
@@ -76,7 +76,6 @@ public class CoGroupSolutionSetFirstTest extends CompilerTestBase {
 		oPlan.accept(new Visitor<PlanNode>() {
 			@Override
 			public boolean preVisit(PlanNode visitable) {
-				System.out.println(visitable);
 				if (visitable instanceof WorksetIterationPlanNode) {
 					PlanNode deltaNode = ((WorksetIterationPlanNode) visitable).getSolutionSetDeltaPlanNode();