You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/06/17 02:09:12 UTC
[2/6] git commit: [FLINK-935] (continued) Correct feedback property
check for iterative algorithms.
[FLINK-935] (continued) Correct feedback property check for iterative algorithms.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/ca2b287a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/ca2b287a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/ca2b287a
Branch: refs/heads/master
Commit: ca2b287a7a78328ebf43766b9fdf39b56fb5fd4f
Parents: ed7c3f1
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Jun 16 15:05:10 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jun 17 02:07:37 2014 +0200
----------------------------------------------------------------------
.../api/avro/AvroOutputFormatTest.java | 1 +
.../compiler/dag/BulkIterationNode.java | 61 +-
.../stratosphere/compiler/dag/DataSinkNode.java | 2 +
.../compiler/dag/SingleInputNode.java | 2 +
.../stratosphere/compiler/dag/TwoInputNode.java | 17 +-
.../compiler/dag/WorksetIterationNode.java | 55 +-
.../dataproperties/LocalProperties.java | 2 +-
.../RequestedGlobalProperties.java | 2 -
.../RequestedLocalProperties.java | 8 +-
.../eu/stratosphere/compiler/plan/PlanNode.java | 103 +-
.../compiler/CoGroupSolutionSetFirstTest.java | 1 -
.../compiler/FeedbackPropertiesMatchTest.java | 1432 ++++++++++++++++++
.../pact/compiler/IterationsCompilerTest.java | 3 -
.../testfunctions/DummyJoinFunction.java | 28 +
.../example/java/graph/PageRankBasic.java | 8 +-
.../pact/runtime/cache/FileCache.java | 9 +-
.../pact/runtime/shipping/ShipStrategyType.java | 2 +-
.../netty/NettyConnectionManagerTest.java | 2 +-
.../iterations/ConnectedComponentsTest.java | 4 +-
.../iterations/PageRankCompilerTest.java | 108 ++
20 files changed, 1807 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca2b287a/stratosphere-addons/avro/src/test/java/eu/stratosphere/api/avro/AvroOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-addons/avro/src/test/java/eu/stratosphere/api/avro/AvroOutputFormatTest.java b/stratosphere-addons/avro/src/test/java/eu/stratosphere/api/avro/AvroOutputFormatTest.java
index b0e4b6e..20610a2 100644
--- a/stratosphere-addons/avro/src/test/java/eu/stratosphere/api/avro/AvroOutputFormatTest.java
+++ b/stratosphere-addons/avro/src/test/java/eu/stratosphere/api/avro/AvroOutputFormatTest.java
@@ -32,6 +32,7 @@ import java.util.ArrayList;
import java.util.List;
+@SuppressWarnings("serial")
public class AvroOutputFormatTest extends JavaProgramTestBase {
public static String outputPath1;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca2b287a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkIterationNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkIterationNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkIterationNode.java
index b60f427..f6720ea 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkIterationNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkIterationNode.java
@@ -13,12 +13,14 @@
package eu.stratosphere.compiler.dag;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import eu.stratosphere.api.common.operators.base.BulkIterationBase;
+import eu.stratosphere.api.common.operators.util.FieldList;
import eu.stratosphere.compiler.CompilerException;
import eu.stratosphere.compiler.DataStatistics;
import eu.stratosphere.compiler.PactCompiler.InterestingPropertyVisitor;
@@ -36,6 +38,9 @@ import eu.stratosphere.compiler.plan.BulkPartialSolutionPlanNode;
import eu.stratosphere.compiler.plan.Channel;
import eu.stratosphere.compiler.plan.NamedChannel;
import eu.stratosphere.compiler.plan.PlanNode;
+import eu.stratosphere.compiler.plan.SingleInputPlanNode;
+import eu.stratosphere.compiler.plan.PlanNode.FeedbackPropertiesMeetRequirementsReport;
+import eu.stratosphere.pact.runtime.task.DriverStrategy;
import eu.stratosphere.util.Visitor;
/**
@@ -271,6 +276,9 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode
// 1) Because we enumerate multiple times, we may need to clean the cached plans
// before starting another enumeration
this.nextPartialSolution.accept(PlanCacheCleaner.INSTANCE);
+ if (this.terminationCriterion != null) {
+ this.terminationCriterion.accept(PlanCacheCleaner.INSTANCE);
+ }
// 2) Give the partial solution the properties of the current candidate for the initial partial solution
this.partialSolution.setCandidateProperties(in.getGlobalProperties(), in.getLocalProperties(), in);
@@ -282,14 +290,51 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode
// 4) Make sure that the beginning of the step function does not assume properties that
// are not also produced by the end of the step function.
- for (Iterator<PlanNode> planDeleter = candidates.iterator(); planDeleter.hasNext(); ) {
- PlanNode candidate = planDeleter.next();
+ {
+ List<PlanNode> newCandidates = new ArrayList<PlanNode>();
- // quick-check if the properties at the end of the step function are the same as at the beginning
- if (candidate.getGlobalProperties().equals(pspn.getGlobalProperties()) && candidate.getLocalProperties().equals(pspn.getLocalProperties())) {
- continue;
+ for (Iterator<PlanNode> planDeleter = candidates.iterator(); planDeleter.hasNext(); ) {
+ PlanNode candidate = planDeleter.next();
+
+ GlobalProperties atEndGlobal = candidate.getGlobalProperties();
+ LocalProperties atEndLocal = candidate.getLocalProperties();
+
+ FeedbackPropertiesMeetRequirementsReport report = candidate.checkPartialSolutionPropertiesMet(pspn, atEndGlobal, atEndLocal);
+ if (report == FeedbackPropertiesMeetRequirementsReport.NO_PARTIAL_SOLUTION) {
+ ; // depends only through broadcast variable on the partial solution
+ }
+ else if (report == FeedbackPropertiesMeetRequirementsReport.NOT_MET) {
+ // attach a no-op node through which we create the properties of the original input
+ Channel toNoOp = new Channel(candidate);
+ globPropsReq.parameterizeChannel(toNoOp, false, false);
+ locPropsReq.parameterizeChannel(toNoOp);
+
+ UnaryOperatorNode rebuildPropertiesNode = new UnaryOperatorNode("Rebuild Partial Solution Properties", FieldList.EMPTY_LIST);
+ rebuildPropertiesNode.setDegreeOfParallelism(candidate.getDegreeOfParallelism());
+ rebuildPropertiesNode.setSubtasksPerInstance(candidate.getSubtasksPerInstance());
+
+ SingleInputPlanNode rebuildPropertiesPlanNode = new SingleInputPlanNode(rebuildPropertiesNode, "Rebuild Partial Solution Properties", toNoOp, DriverStrategy.UNARY_NO_OP);
+ rebuildPropertiesPlanNode.initProperties(toNoOp.getGlobalProperties(), toNoOp.getLocalProperties());
+ estimator.costOperator(rebuildPropertiesPlanNode);
+
+ GlobalProperties atEndGlobalModified = rebuildPropertiesPlanNode.getGlobalProperties();
+ LocalProperties atEndLocalModified = rebuildPropertiesPlanNode.getLocalProperties();
+
+ if (!(atEndGlobalModified.equals(atEndGlobal) && atEndLocalModified.equals(atEndLocal))) {
+ FeedbackPropertiesMeetRequirementsReport report2 = candidate.checkPartialSolutionPropertiesMet(pspn, atEndGlobalModified, atEndLocalModified);
+
+ if (report2 != FeedbackPropertiesMeetRequirementsReport.NOT_MET) {
+ newCandidates.add(rebuildPropertiesPlanNode);
+ }
+ }
+
+ planDeleter.remove();
+ }
}
- planDeleter.remove();
+ }
+
+ if (candidates.isEmpty()) {
+ return;
}
// 5) Create a candidate for the Iteration Node for every remaining plan of the step function.
@@ -302,13 +347,13 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode
target.add(node);
}
}
- else if(candidates.size() > 0) {
+ else if (candidates.size() > 0) {
List<PlanNode> terminationCriterionCandidates = this.terminationCriterion.getAlternativePlans(estimator);
SingleRootJoiner singleRoot = (SingleRootJoiner) this.singleRoot;
for (PlanNode candidate : candidates) {
- for(PlanNode terminationCandidate : terminationCriterionCandidates) {
+ for (PlanNode terminationCandidate : terminationCriterionCandidates) {
if (singleRoot.areBranchCompatible(candidate, terminationCandidate)) {
BulkIterationPlanNode node = new BulkIterationPlanNode(this, "BulkIteration ("+this.getPactContract().getName()+")", in, pspn, candidate, terminationCandidate);
GlobalProperties gProps = candidate.getGlobalProperties().clone();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca2b287a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSinkNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSinkNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSinkNode.java
index 15c7670..fe823d2 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSinkNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSinkNode.java
@@ -210,6 +210,8 @@ public class DataSinkNode extends OptimizerNode {
Channel c = new Channel(p);
gp.parameterizeChannel(c, globalDopChange, localDopChange);
lp.parameterizeChannel(c);
+ c.setRequiredLocalProps(lp);
+ c.setRequiredGlobalProps(gp);
// no need to check whether the created properties meet what we need in case
// of ordering or global ordering, because the only interesting properties we have
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca2b287a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java
index fd4b76a..e1727f9 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java
@@ -312,6 +312,7 @@ public abstract class SingleInputNode extends OptimizerNode {
// requested properties
for (RequestedGlobalProperties rgps: allValidGlobals) {
if (rgps.isMetBy(c.getGlobalProperties())) {
+ c.setRequiredGlobalProps(rgps);
addLocalCandidates(c, broadcastPlanChannels, igps, outputPlans, estimator);
break;
}
@@ -365,6 +366,7 @@ public abstract class SingleInputNode extends OptimizerNode {
for (OperatorDescriptorSingle dps: getPossibleProperties()) {
for (RequestedLocalProperties ilps : dps.getPossibleLocalProperties()) {
if (ilps.isMetBy(in.getLocalProperties())) {
+ in.setRequiredLocalProps(ilps);
instantiateCandidate(dps, in, broadcastPlanChannels, target, estimator, rgps, ilp);
break outer;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca2b287a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java
index cccbeba..9898c81 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java
@@ -447,9 +447,13 @@ public abstract class TwoInputNode extends OptimizerNode {
if (gpp.getProperties1().isMetBy(c1.getGlobalProperties()) &&
gpp.getProperties2().isMetBy(c2.getGlobalProperties()) )
{
+ Channel c1Clone = c1.clone();
+ c1Clone.setRequiredGlobalProps(gpp.getProperties1());
+ c2.setRequiredGlobalProps(gpp.getProperties2());
+
// we form a valid combination, so create the local candidates
// for this
- addLocalCandidates(c1, c2, broadcastPlanChannels, igps1, igps2, outputPlans, allLocalPairs, estimator);
+ addLocalCandidates(c1Clone, c2, broadcastPlanChannels, igps1, igps2, outputPlans, allLocalPairs, estimator);
break;
}
}
@@ -495,7 +499,6 @@ public abstract class TwoInputNode extends OptimizerNode {
final Channel in2 = template2.clone();
ilp2.parameterizeChannel(in2);
- allPossibleLoop:
for (OperatorDescriptorDual dps: this.possibleProperties) {
for (LocalPropertiesPair lpp : dps.getPossibleLocalProperties()) {
if (lpp.getProperties1().isMetBy(in1.getLocalProperties()) &&
@@ -507,12 +510,16 @@ public abstract class TwoInputNode extends OptimizerNode {
if (dps.areCoFulfilled(lpp.getProperties1(), lpp.getProperties2(),
in1.getLocalProperties(), in2.getLocalProperties()))
{
+ Channel in1Copy = in1.clone();
+ in1Copy.setRequiredLocalProps(lpp.getProperties1());
+ in2.setRequiredLocalProps(lpp.getProperties2());
+
// all right, co compatible
- instantiate(dps, in1, in2, broadcastPlanChannels, target, estimator, rgps1, rgps2, ilp1, ilp2);
- break allPossibleLoop;
+ instantiate(dps, in1Copy, in2, broadcastPlanChannels, target, estimator, rgps1, rgps2, ilp1, ilp2);
+ break;
} else {
// meet, but not co-compatible
- throw new CompilerException("Implements to adjust one side to the other!");
+// throw new CompilerException("Implements to adjust one side to the other!");
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca2b287a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java
index 76c4402..2c70794 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java
@@ -41,6 +41,7 @@ import eu.stratosphere.compiler.plan.SingleInputPlanNode;
import eu.stratosphere.compiler.plan.SolutionSetPlanNode;
import eu.stratosphere.compiler.plan.WorksetIterationPlanNode;
import eu.stratosphere.compiler.plan.WorksetPlanNode;
+import eu.stratosphere.compiler.plan.PlanNode.FeedbackPropertiesMeetRequirementsReport;
import eu.stratosphere.compiler.util.NoOpBinaryUdfOp;
import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
import eu.stratosphere.pact.runtime.task.DriverStrategy;
@@ -279,7 +280,7 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode
@Override
protected void instantiate(OperatorDescriptorDual operator, Channel solutionSetIn, Channel worksetIn,
List<Set<? extends NamedChannel>> broadcastPlanChannels, List<PlanNode> target, CostEstimator estimator,
- RequestedGlobalProperties globPropsReqSolutionSet,RequestedGlobalProperties globPropsReqWorkset,
+ RequestedGlobalProperties globPropsReqSolutionSet, RequestedGlobalProperties globPropsReqWorkset,
RequestedLocalProperties locPropsReqSolutionSet, RequestedLocalProperties locPropsReqWorkset)
{
// check for pipeline breaking using hash join with build on the solution set side
@@ -314,12 +315,54 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode
// initial partial solution
// Make sure that the workset candidates fulfill the input requirements
- for (Iterator<PlanNode> planDeleter = worksetCandidates.iterator(); planDeleter.hasNext(); ) {
- PlanNode candidate = planDeleter.next();
- if (!(globPropsReqWorkset.isMetBy(candidate.getGlobalProperties()) && locPropsReqWorkset.isMetBy(candidate.getLocalProperties()))) {
- planDeleter.remove();
+ {
+ List<PlanNode> newCandidates = new ArrayList<PlanNode>();
+
+ for (Iterator<PlanNode> planDeleter = worksetCandidates.iterator(); planDeleter.hasNext(); ) {
+ PlanNode candidate = planDeleter.next();
+
+ GlobalProperties atEndGlobal = candidate.getGlobalProperties();
+ LocalProperties atEndLocal = candidate.getLocalProperties();
+
+ FeedbackPropertiesMeetRequirementsReport report = candidate.checkPartialSolutionPropertiesMet(wspn, atEndGlobal, atEndLocal);
+ if (report == FeedbackPropertiesMeetRequirementsReport.NO_PARTIAL_SOLUTION) {
+ ; // depends only through broadcast variable on the workset solution
+ }
+ else if (report == FeedbackPropertiesMeetRequirementsReport.NOT_MET) {
+ // attach a no-op node through which we create the properties of the original input
+ Channel toNoOp = new Channel(candidate);
+ globPropsReqWorkset.parameterizeChannel(toNoOp, false, false);
+ locPropsReqWorkset.parameterizeChannel(toNoOp);
+
+ UnaryOperatorNode rebuildWorksetPropertiesNode = new UnaryOperatorNode("Rebuild Workset Properties", FieldList.EMPTY_LIST);
+
+ rebuildWorksetPropertiesNode.setDegreeOfParallelism(candidate.getDegreeOfParallelism());
+ rebuildWorksetPropertiesNode.setSubtasksPerInstance(candidate.getSubtasksPerInstance());
+
+ SingleInputPlanNode rebuildWorksetPropertiesPlanNode = new SingleInputPlanNode(rebuildWorksetPropertiesNode, "Rebuild Workset Properties", toNoOp, DriverStrategy.UNARY_NO_OP);
+ rebuildWorksetPropertiesPlanNode.initProperties(toNoOp.getGlobalProperties(), toNoOp.getLocalProperties());
+ estimator.costOperator(rebuildWorksetPropertiesPlanNode);
+
+ GlobalProperties atEndGlobalModified = rebuildWorksetPropertiesPlanNode.getGlobalProperties();
+ LocalProperties atEndLocalModified = rebuildWorksetPropertiesPlanNode.getLocalProperties();
+
+ if (!(atEndGlobalModified.equals(atEndGlobal) && atEndLocalModified.equals(atEndLocal))) {
+ FeedbackPropertiesMeetRequirementsReport report2 = candidate.checkPartialSolutionPropertiesMet(wspn, atEndGlobalModified, atEndLocalModified);
+
+ if (report2 != FeedbackPropertiesMeetRequirementsReport.NOT_MET) {
+ newCandidates.add(rebuildWorksetPropertiesPlanNode);
+ }
+ }
+
+ // remove the original operator and add the modified candidate
+ planDeleter.remove();
+
+ }
}
+
+ worksetCandidates.addAll(newCandidates);
}
+
if (worksetCandidates.isEmpty()) {
return;
}
@@ -342,7 +385,7 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode
gp.setHashPartitioned(this.solutionSetKeyFields);
gp.addUniqueFieldCombination(this.solutionSetKeyFields);
- LocalProperties lp = LocalProperties.TRIVIAL.addUniqueFields(this.solutionSetKeyFields);
+ LocalProperties lp = LocalProperties.EMPTY.addUniqueFields(this.solutionSetKeyFields);
// take all combinations of solution set delta and workset plans
for (PlanNode solutionSetCandidate : solutionSetDeltaCandidates) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca2b287a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/LocalProperties.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/LocalProperties.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/LocalProperties.java
index b45e341..f49c130 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/LocalProperties.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/LocalProperties.java
@@ -27,7 +27,7 @@ import eu.stratosphere.compiler.dag.OptimizerNode;
*/
public class LocalProperties implements Cloneable {
- public static final LocalProperties TRIVIAL = new LocalProperties();
+ public static final LocalProperties EMPTY = new LocalProperties();
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca2b287a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedGlobalProperties.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedGlobalProperties.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedGlobalProperties.java
index 3935f5e..574922a 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedGlobalProperties.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedGlobalProperties.java
@@ -229,8 +229,6 @@ public final class RequestedGlobalProperties implements Cloneable {
final GlobalProperties inGlobals = channel.getSource().getGlobalProperties();
// if we have no global parallelism change, check if we have already compatible global properties
if (!globalDopChange && !localDopChange && isMetBy(inGlobals)) {
- channel.setRequiredGlobalProps(this);
-
// we meet already everything, so go forward
channel.setShipStrategy(ShipStrategyType.FORWARD);
return;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca2b287a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedLocalProperties.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedLocalProperties.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedLocalProperties.java
index e714cea..aeae0d2 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedLocalProperties.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedLocalProperties.java
@@ -33,9 +33,9 @@ public class RequestedLocalProperties implements Cloneable {
// --------------------------------------------------------------------------------------------
- Ordering ordering; // order inside a partition, null if not ordered
+ private Ordering ordering; // order inside a partition, null if not ordered
- FieldSet groupedFields; // fields by which the stream is grouped. null if not grouped.
+ private FieldSet groupedFields; // fields by which the stream is grouped. null if not grouped.
// --------------------------------------------------------------------------------------------
@@ -190,8 +190,8 @@ public class RequestedLocalProperties implements Cloneable {
LocalProperties current = channel.getLocalProperties();
if (isMetBy(current)) {
- // we are met. record that this is needed.
- channel.setRequiredLocalProps(this);
+ // we are met, all is good
+ channel.setLocalStrategy(LocalStrategy.NONE);
}
else if (this.ordering != null) {
channel.setLocalStrategy(LocalStrategy.SORT, this.ordering.getInvolvedIndexes(), this.ordering.getFieldSortDirections());
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca2b287a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/PlanNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/PlanNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/PlanNode.java
index 16241e4..69263bc 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/PlanNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/PlanNode.java
@@ -29,7 +29,9 @@ import eu.stratosphere.compiler.dataproperties.GlobalProperties;
import eu.stratosphere.compiler.dataproperties.LocalProperties;
import eu.stratosphere.compiler.plandump.DumpableConnection;
import eu.stratosphere.compiler.plandump.DumpableNode;
+import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
import eu.stratosphere.pact.runtime.task.DriverStrategy;
+import eu.stratosphere.pact.runtime.task.util.LocalStrategy;
import eu.stratosphere.util.Visitable;
/**
@@ -434,8 +436,85 @@ public abstract class PlanNode implements Visitable<PlanNode>, DumpableNode<Plan
// --------------------------------------------------------------------------------------------
+ /**
+ * Checks whether this node has a dam on the way down to the given source node. This method
+ * returns either that (a) the source node is not found as a (transitive) child of this node,
+ * (b) the node is found, but no dam is on the path, or (c) the node is found and a dam is on
+ * the path.
+ *
+ * @param source The node on the path to which the dam is sought.
+ * @return The result whether the node is found and whether a dam is on the path.
+ */
public abstract SourceAndDamReport hasDamOnPathDownTo(PlanNode source);
+ public FeedbackPropertiesMeetRequirementsReport checkPartialSolutionPropertiesMet(PlanNode partialSolution, GlobalProperties feedbackGlobal, LocalProperties feedbackLocal) {
+ if (this == partialSolution) {
+ return FeedbackPropertiesMeetRequirementsReport.PENDING;
+ }
+
+ boolean found = false;
+ boolean allMet = true;
+ boolean allLocallyMet = true;
+
+ for (Channel input : getInputs()) {
+ FeedbackPropertiesMeetRequirementsReport inputState = input.getSource().checkPartialSolutionPropertiesMet(partialSolution, feedbackGlobal, feedbackLocal);
+
+ if (inputState == FeedbackPropertiesMeetRequirementsReport.NO_PARTIAL_SOLUTION) {
+ continue;
+ }
+ else if (inputState == FeedbackPropertiesMeetRequirementsReport.MET) {
+ found = true;
+ continue;
+ }
+ else if (inputState == FeedbackPropertiesMeetRequirementsReport.NOT_MET) {
+ return FeedbackPropertiesMeetRequirementsReport.NOT_MET;
+ }
+ else {
+ found = true;
+
+ // the partial solution was on the path here. check whether the channel requires
+ // certain properties that are met, or whether the channel introduces new properties
+
+ // if the plan introduces new global properties, then we can stop looking whether
+ // the feedback properties are sufficient to meet the requirements
+ if (input.getShipStrategy() != ShipStrategyType.FORWARD && input.getShipStrategy() != ShipStrategyType.NONE) {
+ continue;
+ }
+
+ // first check whether this channel requires something that is not met
+ if (input.getRequiredGlobalProps() != null && !input.getRequiredGlobalProps().isMetBy(feedbackGlobal)) {
+ return FeedbackPropertiesMeetRequirementsReport.NOT_MET;
+ }
+
+ // in general, not everything is met here already
+ allMet = false;
+
+ // if the plan introduces new local properties, we can stop checking for matching local properties
+ if (inputState != FeedbackPropertiesMeetRequirementsReport.PENDING_LOCAL_MET) {
+
+ if (input.getLocalStrategy() == LocalStrategy.NONE) {
+
+ if (input.getRequiredLocalProps() != null && !input.getRequiredLocalProps().isMetBy(feedbackLocal)) {
+ return FeedbackPropertiesMeetRequirementsReport.NOT_MET;
+ }
+
+ allLocallyMet = false;
+ }
+ }
+ }
+ }
+
+ if (!found) {
+ return FeedbackPropertiesMeetRequirementsReport.NO_PARTIAL_SOLUTION;
+ } else if (allMet) {
+ return FeedbackPropertiesMeetRequirementsReport.MET;
+ } else if (allLocallyMet) {
+ return FeedbackPropertiesMeetRequirementsReport.PENDING_LOCAL_MET;
+ } else {
+ return FeedbackPropertiesMeetRequirementsReport.PENDING;
+ }
+ }
+
// --------------------------------------------------------------------------------------------
@@ -447,19 +526,16 @@ public abstract class PlanNode implements Visitable<PlanNode>, DumpableNode<Plan
// --------------------------------------------------------------------------------------------
-
@Override
public OptimizerNode getOptimizerNode() {
return this.template;
}
-
@Override
public PlanNode getPlanNode() {
return this;
}
-
@Override
public Iterable<DumpableConnection<PlanNode>> getDumpableInputs() {
List<DumpableConnection<PlanNode>> allInputs = new ArrayList<DumpableConnection<PlanNode>>();
@@ -480,4 +556,25 @@ public abstract class PlanNode implements Visitable<PlanNode>, DumpableNode<Plan
public static enum SourceAndDamReport {
NOT_FOUND, FOUND_SOURCE, FOUND_SOURCE_AND_DAM;
}
+
+
+
+ public static enum FeedbackPropertiesMeetRequirementsReport {
+ /** Indicates that the path is irrelevant */
+ NO_PARTIAL_SOLUTION,
+
+ /** Indicates that the question whether the properties are met has been determined pending
+ * dependent on global and local properties */
+ PENDING,
+
+ /** Indicates that the question whether the properties are met has been determined pending
+ * dependent on global properties only */
+ PENDING_LOCAL_MET,
+
+ /** Indicates that the question whether the properties are met has been determined true */
+ MET,
+
+ /** Indicates that the question whether the properties are met has been determined false */
+ NOT_MET;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca2b287a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/CoGroupSolutionSetFirstTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/CoGroupSolutionSetFirstTest.java b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/CoGroupSolutionSetFirstTest.java
index c2b377e..1c99558 100644
--- a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/CoGroupSolutionSetFirstTest.java
+++ b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/CoGroupSolutionSetFirstTest.java
@@ -76,7 +76,6 @@ public class CoGroupSolutionSetFirstTest extends CompilerTestBase {
oPlan.accept(new Visitor<PlanNode>() {
@Override
public boolean preVisit(PlanNode visitable) {
- System.out.println(visitable);
if (visitable instanceof WorksetIterationPlanNode) {
PlanNode deltaNode = ((WorksetIterationPlanNode) visitable).getSolutionSetDeltaPlanNode();