You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/06/17 02:09:16 UTC
[6/6] git commit: [FLINK-935] Fix compiler logic that pushed work out
of the iteration loop.
[FLINK-935] Fix compiler logic that pushed work out of the iteration loop.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/31a37393
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/31a37393
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/31a37393
Branch: refs/heads/master
Commit: 31a373930bce27a0d4abc2cbcff4284d5af9002a
Parents: 229754d
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Jun 13 19:56:26 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jun 17 02:07:37 2014 +0200
----------------------------------------------------------------------
.../compiler/costs/CostEstimator.java | 2 -
.../compiler/dag/BulkIterationNode.java | 12 +-
.../compiler/dag/BulkPartialSolutionNode.java | 2 +-
.../stratosphere/compiler/dag/DataSinkNode.java | 8 +-
.../compiler/dag/SingleInputNode.java | 11 +-
.../stratosphere/compiler/dag/TwoInputNode.java | 16 +-
.../compiler/dag/WorksetIterationNode.java | 2 +-
.../RequestedGlobalProperties.java | 46 +---
.../RequestedLocalProperties.java | 14 +-
.../eu/stratosphere/compiler/plan/Channel.java | 76 +++---
.../plandump/PlanJSONDumpGenerator.java | 3 -
.../plantranslate/NepheleJobGraphGenerator.java | 1 -
.../pact/compiler/DOPChangeTest.java | 4 +-
.../pact/compiler/IterationsCompilerTest.java | 265 +++++++++++++++++++
.../MultipleIterationsCompilerTest.java | 221 ----------------
.../WorksetEmptyConvergenceCriterion.java | 2 +
.../pact/runtime/shipping/OutputEmitter.java | 2 -
.../runtime/shipping/RecordOutputEmitter.java | 2 -
.../pact/runtime/shipping/ShipStrategyType.java | 6 -
.../test/util/testjar/KMeansForTest.java | 2 +-
20 files changed, 345 insertions(+), 352 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/CostEstimator.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/CostEstimator.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/CostEstimator.java
index 492204b..11fb45b 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/CostEstimator.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/CostEstimator.java
@@ -94,8 +94,6 @@ public abstract class CostEstimator {
case FORWARD:
// costs.addHeuristicNetworkCost(channel.getMaxDepth());
break;
- case PARTITION_LOCAL_HASH:
- break;
case PARTITION_RANDOM:
addRandomPartitioningCost(channel, costs);
break;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkIterationNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkIterationNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkIterationNode.java
index c547da9..b60f427 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkIterationNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkIterationNode.java
@@ -279,13 +279,17 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode
// 3) Get the alternative plans
List<PlanNode> candidates = this.nextPartialSolution.getAlternativePlans(estimator);
- // 4) Throw away all that are not compatible with the properties currently requested to the
- // initial partial solution
+ // 4) Make sure that the beginning of the step function does not assume properties that
+ // are not also produced by the end of the step function.
+
for (Iterator<PlanNode> planDeleter = candidates.iterator(); planDeleter.hasNext(); ) {
PlanNode candidate = planDeleter.next();
- if (!(globPropsReq.isMetBy(candidate.getGlobalProperties()) && locPropsReq.isMetBy(candidate.getLocalProperties()))) {
- planDeleter.remove();
+
+ // quick-check if the properties at the end of the step function are the same as at the beginning
+ if (candidate.getGlobalProperties().equals(pspn.getGlobalProperties()) && candidate.getLocalProperties().equals(pspn.getLocalProperties())) {
+ continue;
}
+ planDeleter.remove();
}
// 5) Create a candidate for the Iteration Node for every remaining plan of the step function.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkPartialSolutionNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkPartialSolutionNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkPartialSolutionNode.java
index 449a2fc..592768b 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkPartialSolutionNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkPartialSolutionNode.java
@@ -43,7 +43,7 @@ public class BulkPartialSolutionNode extends AbstractPartialSolutionNode {
if (this.cachedPlans != null) {
throw new IllegalStateException();
} else {
- this.cachedPlans = Collections.<PlanNode>singletonList(new BulkPartialSolutionPlanNode(this, "BulkPartialSolution("+this.getPactContract().getName()+")", gProps, lProps, initialInput));
+ this.cachedPlans = Collections.<PlanNode>singletonList(new BulkPartialSolutionPlanNode(this, "PartialSolution ("+this.getPactContract().getName()+")", gProps, lProps, initialInput));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSinkNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSinkNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSinkNode.java
index 3f7b224..15c7670 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSinkNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSinkNode.java
@@ -31,7 +31,6 @@ import eu.stratosphere.compiler.dataproperties.RequestedLocalProperties;
import eu.stratosphere.compiler.plan.Channel;
import eu.stratosphere.compiler.plan.PlanNode;
import eu.stratosphere.compiler.plan.SinkPlanNode;
-import eu.stratosphere.pact.runtime.task.util.LocalStrategy;
import eu.stratosphere.util.Visitor;
/**
@@ -210,12 +209,7 @@ public class DataSinkNode extends OptimizerNode {
for (RequestedLocalProperties lp : ips.getLocalProperties()) {
Channel c = new Channel(p);
gp.parameterizeChannel(c, globalDopChange, localDopChange);
-
- if (lp.isMetBy(c.getLocalPropertiesAfterShippingOnly())) {
- c.setLocalStrategy(LocalStrategy.NONE);
- } else {
- lp.parameterizeChannel(c);
- }
+ lp.parameterizeChannel(c);
// no need to check whether the created properties meet what we need in case
// of ordering or global ordering, because the only interesting properties we have
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java
index 2a9e5c6..fd4b76a 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java
@@ -42,7 +42,6 @@ import eu.stratosphere.compiler.plan.SingleInputPlanNode;
import eu.stratosphere.compiler.util.NoOpUnaryUdfOp;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
-import eu.stratosphere.pact.runtime.task.util.LocalStrategy;
import eu.stratosphere.util.Visitor;
/**
@@ -357,21 +356,17 @@ public abstract class SingleInputNode extends OptimizerNode {
protected void addLocalCandidates(Channel template, List<Set<? extends NamedChannel>> broadcastPlanChannels, RequestedGlobalProperties rgps,
List<PlanNode> target, CostEstimator estimator)
{
- final LocalProperties lp = template.getLocalPropertiesAfterShippingOnly();
for (RequestedLocalProperties ilp : this.inConn.getInterestingProperties().getLocalProperties()) {
final Channel in = template.clone();
- if (ilp.isMetBy(lp)) {
- in.setLocalStrategy(LocalStrategy.NONE);
- } else {
- ilp.parameterizeChannel(in);
- }
+ ilp.parameterizeChannel(in);
// instantiate a candidate, if the instantiated local properties meet one possible local property set
+ outer:
for (OperatorDescriptorSingle dps: getPossibleProperties()) {
for (RequestedLocalProperties ilps : dps.getPossibleLocalProperties()) {
if (ilps.isMetBy(in.getLocalProperties())) {
instantiateCandidate(dps, in, broadcastPlanChannels, target, estimator, rgps, ilp);
- break;
+ break outer;
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java
index 5046ceb..cccbeba 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java
@@ -51,7 +51,6 @@ import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
import eu.stratosphere.pact.runtime.task.DamBehavior;
import eu.stratosphere.pact.runtime.task.DriverStrategy;
-import eu.stratosphere.pact.runtime.task.util.LocalStrategy;
import eu.stratosphere.util.Visitor;
/**
@@ -488,24 +487,13 @@ public abstract class TwoInputNode extends OptimizerNode {
RequestedGlobalProperties rgps1, RequestedGlobalProperties rgps2,
List<PlanNode> target, LocalPropertiesPair[] validLocalCombinations, CostEstimator estimator)
{
- final LocalProperties lp1 = template1.getLocalPropertiesAfterShippingOnly();
- final LocalProperties lp2 = template2.getLocalPropertiesAfterShippingOnly();
-
for (RequestedLocalProperties ilp1 : this.input1.getInterestingProperties().getLocalProperties()) {
final Channel in1 = template1.clone();
- if (ilp1.isMetBy(lp1)) {
- in1.setLocalStrategy(LocalStrategy.NONE);
- } else {
- ilp1.parameterizeChannel(in1);
- }
+ ilp1.parameterizeChannel(in1);
for (RequestedLocalProperties ilp2 : this.input2.getInterestingProperties().getLocalProperties()) {
final Channel in2 = template2.clone();
- if (ilp2.isMetBy(lp2)) {
- in2.setLocalStrategy(LocalStrategy.NONE);
- } else {
- ilp2.parameterizeChannel(in2);
- }
+ ilp2.parameterizeChannel(in2);
allPossibleLoop:
for (OperatorDescriptorDual dps: this.possibleProperties) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java
index be02e01..94580ad 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java
@@ -397,7 +397,7 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode
List<UnclosedBranchDescriptor> result2 = getSecondPredecessorNode().getBranchesForParent(getSecondIncomingConnection());
ArrayList<UnclosedBranchDescriptor> inputsMerged1 = new ArrayList<UnclosedBranchDescriptor>();
- mergeLists(result1, result2, inputsMerged1);
+ mergeLists(result1, result2, inputsMerged1); // this method also sets which branches are joined here (in the head)
addClosedBranches(getSingleRootOfStepFunction().closedBranchingNodes);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedGlobalProperties.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedGlobalProperties.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedGlobalProperties.java
index 06d7371..3935f5e 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedGlobalProperties.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedGlobalProperties.java
@@ -17,8 +17,6 @@ import eu.stratosphere.api.common.distributions.DataDistribution;
import eu.stratosphere.api.common.operators.Ordering;
import eu.stratosphere.api.common.operators.util.FieldSet;
import eu.stratosphere.compiler.CompilerException;
-import eu.stratosphere.compiler.costs.CostEstimator;
-import eu.stratosphere.compiler.costs.Costs;
import eu.stratosphere.compiler.dag.OptimizerNode;
import eu.stratosphere.compiler.plan.Channel;
import eu.stratosphere.compiler.util.Utils;
@@ -31,8 +29,8 @@ import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
* Currently, the properties are the following: A partitioning type (ANY, HASH, RANGE), and EITHER an ordering (for range partitioning)
* or an FieldSet with the hash partitioning columns.
*/
-public final class RequestedGlobalProperties implements Cloneable
-{
+public final class RequestedGlobalProperties implements Cloneable {
+
private PartitioningProperty partitioning; // the type partitioning
private FieldSet partitioningFields; // the fields which are partitioned
@@ -230,20 +228,12 @@ public final class RequestedGlobalProperties implements Cloneable
final GlobalProperties inGlobals = channel.getSource().getGlobalProperties();
// if we have no global parallelism change, check if we have already compatible global properties
- if (!globalDopChange && isMetBy(inGlobals)) {
- if (localDopChange) {
- // if the local degree of parallelism changes, we need to adjust
- if (inGlobals.getPartitioning() == PartitioningProperty.HASH_PARTITIONED) {
- // to preserve the hash partitioning, we need to locally hash re-partition
- channel.setShipStrategy(ShipStrategyType.PARTITION_LOCAL_HASH, inGlobals.getPartitioningFields());
- return;
- }
- // else fall though
- } else {
- // we meet already everything, so go forward
- channel.setShipStrategy(ShipStrategyType.FORWARD);
- return;
- }
+ if (!globalDopChange && !localDopChange && isMetBy(inGlobals)) {
+ channel.setRequiredGlobalProps(this);
+
+ // we meet already everything, so go forward
+ channel.setShipStrategy(ShipStrategyType.FORWARD);
+ return;
}
// if we fall through the conditions until here, we need to re-establish
@@ -266,26 +256,6 @@ public final class RequestedGlobalProperties implements Cloneable
throw new CompilerException();
}
}
-
- public void addMinimalRequiredCosts(Costs to, CostEstimator estimator, OptimizerNode source, OptimizerNode target) {
- if (this.partitioning == null || this.partitioning == PartitioningProperty.RANDOM) {
- return;
- } else {
- switch (this.partitioning) {
- case FULL_REPLICATION:
- estimator.addBroadcastCost(source, target.getDegreeOfParallelism(), to);
- case ANY_PARTITIONING:
- case HASH_PARTITIONED:
- estimator.addHashPartitioningCost(source, to);
- break;
- case RANGE_PARTITIONED:
- estimator.addRangePartitionCost(source, to);
- break;
- default:
- throw new CompilerException();
- }
- }
- }
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedLocalProperties.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedLocalProperties.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedLocalProperties.java
index 7e67f5f..e714cea 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedLocalProperties.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedLocalProperties.java
@@ -187,13 +187,23 @@ public class RequestedLocalProperties implements Cloneable {
* @param channel The channel to parameterize.
*/
public void parameterizeChannel(Channel channel) {
- if (this.ordering != null) {
+ LocalProperties current = channel.getLocalProperties();
+
+ if (isMetBy(current)) {
+ // we are met. record that this is needed.
+ channel.setRequiredLocalProps(this);
+ }
+ else if (this.ordering != null) {
channel.setLocalStrategy(LocalStrategy.SORT, this.ordering.getInvolvedIndexes(), this.ordering.getFieldSortDirections());
- } else if (this.groupedFields != null) {
+ }
+ else if (this.groupedFields != null) {
boolean[] dirs = new boolean[this.groupedFields.size()];
Arrays.fill(dirs, true);
channel.setLocalStrategy(LocalStrategy.SORT, Utils.createOrderedFromSet(this.groupedFields), dirs);
}
+ else {
+ channel.setLocalStrategy(LocalStrategy.NONE);
+ }
}
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/Channel.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/Channel.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/Channel.java
index 3377caf..1cd6a36 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/Channel.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/Channel.java
@@ -22,6 +22,8 @@ import eu.stratosphere.compiler.dag.EstimateProvider;
import eu.stratosphere.compiler.dag.TempMode;
import eu.stratosphere.compiler.dataproperties.GlobalProperties;
import eu.stratosphere.compiler.dataproperties.LocalProperties;
+import eu.stratosphere.compiler.dataproperties.RequestedGlobalProperties;
+import eu.stratosphere.compiler.dataproperties.RequestedLocalProperties;
import eu.stratosphere.compiler.plandump.DumpableConnection;
import eu.stratosphere.compiler.util.Utils;
import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
@@ -48,6 +50,10 @@ public class Channel implements EstimateProvider, Cloneable, DumpableConnection<
private boolean[] localSortOrder;
+ private RequestedGlobalProperties requiredGlobalProps;
+
+ private RequestedLocalProperties requiredLocalProps;
+
private GlobalProperties globalProps;
private LocalProperties localProps;
@@ -329,6 +335,22 @@ public class Channel implements EstimateProvider, Cloneable, DumpableConnection<
// --------------------------------------------------------------------------------------------
+ public RequestedGlobalProperties getRequiredGlobalProps() {
+ return requiredGlobalProps;
+ }
+
+ public void setRequiredGlobalProps(RequestedGlobalProperties requiredGlobalProps) {
+ this.requiredGlobalProps = requiredGlobalProps;
+ }
+
+ public RequestedLocalProperties getRequiredLocalProps() {
+ return requiredLocalProps;
+ }
+
+ public void setRequiredLocalProps(RequestedLocalProperties requiredLocalProps) {
+ this.requiredLocalProps = requiredLocalProps;
+ }
+
public GlobalProperties getGlobalProperties() {
if (this.globalProps == null) {
this.globalProps = this.source.getGlobalProperties().clone();
@@ -348,19 +370,6 @@ public class Channel implements EstimateProvider, Cloneable, DumpableConnection<
case PARTITION_RANDOM:
this.globalProps.reset();
break;
- case PARTITION_LOCAL_HASH:
- if (getSource().getGlobalProperties().isPartitionedOnFields(this.shipKeys)) {
- // after a local hash partitioning, we can only state that the data is somehow
- // partitioned. even if we had a hash partitioning before over 8 partitions,
- // locally rehashing that onto 16 partitions (each one partition into two) gives you
- // a different result than directly hashing to 16 partitions. the hash-partitioning
- // property is only valid, if the assumed built in hash function is directly used.
- // hence, we can only state that this is some form of partitioning.
- this.globalProps.setAnyPartitioning(this.shipKeys);
- } else {
- this.globalProps.reset();
- }
- break;
case NONE:
throw new CompilerException("Cannot produce GlobalProperties before ship strategy is set.");
}
@@ -371,12 +380,13 @@ public class Channel implements EstimateProvider, Cloneable, DumpableConnection<
public LocalProperties getLocalProperties() {
if (this.localProps == null) {
- this.localProps = getLocalPropertiesAfterShippingOnly().clone();
+ computeLocalPropertiesAfterShippingOnly();
switch (this.localStrategy) {
case NONE:
break;
case SORT:
case COMBININGSORT:
+ this.localProps = new LocalProperties();
this.localProps.setOrdering(Utils.createOrdering(this.localKeys, this.localSortOrder));
break;
default:
@@ -387,25 +397,19 @@ public class Channel implements EstimateProvider, Cloneable, DumpableConnection<
return this.localProps;
}
- public LocalProperties getLocalPropertiesAfterShippingOnly() {
- if (this.shipStrategy == ShipStrategyType.FORWARD) {
- return this.source.getLocalProperties();
- } else {
- final LocalProperties props = this.source.getLocalProperties().clone();
- switch (this.shipStrategy) {
- case BROADCAST:
- case PARTITION_HASH:
- case PARTITION_RANGE:
- case PARTITION_RANDOM:
- props.reset();
- break;
- case PARTITION_LOCAL_HASH:
- case FORWARD:
- break;
- case NONE:
- throw new CompilerException("ShipStrategy has not yet been set.");
- }
- return props;
+ private void computeLocalPropertiesAfterShippingOnly() {
+ switch (this.shipStrategy) {
+ case BROADCAST:
+ case PARTITION_HASH:
+ case PARTITION_RANGE:
+ case PARTITION_RANDOM:
+ this.localProps = new LocalProperties();
+ break;
+ case FORWARD:
+ this.localProps = this.source.getLocalProperties();
+ break;
+ case NONE:
+ throw new CompilerException("ShipStrategy has not yet been set.");
}
}
@@ -423,8 +427,7 @@ public class Channel implements EstimateProvider, Cloneable, DumpableConnection<
// some strategies globally reestablish properties
switch (this.shipStrategy) {
case FORWARD:
- case PARTITION_LOCAL_HASH:
- throw new CompilerException("Cannot use FORWARD or LOCAL_HASH strategy between operations " +
+ throw new CompilerException("Cannot use FORWARD strategy between operations " +
"with different number of parallel instances.");
case NONE: // excluded by sanity check. lust here for verification check completion
case BROADCAST:
@@ -452,8 +455,7 @@ public class Channel implements EstimateProvider, Cloneable, DumpableConnection<
case FORWARD:
this.globalProps.reset();
return;
- case NONE: // excluded by sanity check. lust here for verification check completion
- case PARTITION_LOCAL_HASH:
+ case NONE: // excluded by sanity check. just here to silence compiler warnings check completion
case BROADCAST:
case PARTITION_HASH:
case PARTITION_RANGE:
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plandump/PlanJSONDumpGenerator.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plandump/PlanJSONDumpGenerator.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plandump/PlanJSONDumpGenerator.java
index 183ee2f..e89a18b 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plandump/PlanJSONDumpGenerator.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plandump/PlanJSONDumpGenerator.java
@@ -319,9 +319,6 @@ public class PlanJSONDumpGenerator {
case PARTITION_RANGE:
shipStrategy = "Range Partition";
break;
- case PARTITION_LOCAL_HASH:
- shipStrategy = "Hash Partition (local)";
- break;
case PARTITION_RANDOM:
shipStrategy = "Redistribute";
break;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java
index 4817095..aebf2cf 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java
@@ -1041,7 +1041,6 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
switch (channel.getShipStrategy()) {
case FORWARD:
- case PARTITION_LOCAL_HASH:
distributionPattern = DistributionPattern.POINTWISE;
channelType = ChannelType.NETWORK;
break;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/DOPChangeTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/DOPChangeTest.java b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/DOPChangeTest.java
index 9f53c30..273c42c 100644
--- a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/DOPChangeTest.java
+++ b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/DOPChangeTest.java
@@ -209,8 +209,8 @@ public class DOPChangeTest extends CompilerTestBase {
ShipStrategyType mapIn = map2Node.getInput().getShipStrategy();
ShipStrategyType reduceIn = red2Node.getInput().getShipStrategy();
- Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.PARTITION_LOCAL_HASH, mapIn);
- Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, reduceIn);
+ Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, mapIn);
+ Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.PARTITION_HASH, reduceIn);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/IterationsCompilerTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/IterationsCompilerTest.java b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/IterationsCompilerTest.java
new file mode 100644
index 0000000..a123099
--- /dev/null
+++ b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/IterationsCompilerTest.java
@@ -0,0 +1,265 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.pact.compiler;
+
+import static org.junit.Assert.*;
+
+import java.util.Iterator;
+
+import org.junit.Test;
+
+import eu.stratosphere.api.common.Plan;
+import eu.stratosphere.api.java.DataSet;
+import eu.stratosphere.api.java.DeltaIteration;
+import eu.stratosphere.api.java.ExecutionEnvironment;
+import eu.stratosphere.api.java.IterativeDataSet;
+import eu.stratosphere.api.java.aggregation.Aggregations;
+import eu.stratosphere.api.java.functions.FlatMapFunction;
+import eu.stratosphere.api.java.functions.FunctionAnnotation.ConstantFields;
+import eu.stratosphere.api.java.functions.GroupReduceFunction;
+import eu.stratosphere.api.java.functions.JoinFunction;
+import eu.stratosphere.api.java.functions.MapFunction;
+import eu.stratosphere.api.java.tuple.Tuple1;
+import eu.stratosphere.api.java.tuple.Tuple2;
+import eu.stratosphere.compiler.plan.BulkIterationPlanNode;
+import eu.stratosphere.compiler.plan.Channel;
+import eu.stratosphere.compiler.plan.OptimizedPlan;
+import eu.stratosphere.compiler.plan.WorksetIterationPlanNode;
+import eu.stratosphere.compiler.plandump.PlanJSONDumpGenerator;
+import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
+import eu.stratosphere.util.Collector;
+
+
+@SuppressWarnings({"serial", "unchecked"})
+public class IterationsCompilerTest extends CompilerTestBase {
+
+ @Test
+ public void testTwoIterationsWithMapperInbetween() throws Exception {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(8);
+
+ DataSet<Tuple2<Long, Long>> verticesWithInitialId = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
+
+ DataSet<Tuple2<Long, Long>> edges = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
+
+ DataSet<Tuple2<Long, Long>> bulkResult = doBulkIteration(verticesWithInitialId, edges);
+
+ DataSet<Tuple2<Long, Long>> mappedBulk = bulkResult.map(new DummyMap());
+
+ DataSet<Tuple2<Long, Long>> depResult = doDeltaIteration(mappedBulk, edges);
+
+ depResult.print();
+
+ Plan p = env.createProgramPlan();
+ OptimizedPlan op = compileNoStats(p);
+
+ assertEquals(1, op.getDataSinks().size());
+ assertTrue(op.getDataSinks().iterator().next().getInput().getSource() instanceof WorksetIterationPlanNode);
+
+ WorksetIterationPlanNode wipn = (WorksetIterationPlanNode) op.getDataSinks().iterator().next().getInput().getSource();
+
+ assertEquals(ShipStrategyType.PARTITION_HASH, wipn.getInput1().getShipStrategy());
+ assertTrue(wipn.getInput2().getTempMode().breaksPipeline());
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testTwoIterationsDirectlyChained() throws Exception {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(8);
+
+ DataSet<Tuple2<Long, Long>> verticesWithInitialId = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
+
+ DataSet<Tuple2<Long, Long>> edges = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
+
+ DataSet<Tuple2<Long, Long>> bulkResult = doBulkIteration(verticesWithInitialId, edges);
+
+ DataSet<Tuple2<Long, Long>> depResult = doDeltaIteration(bulkResult, edges);
+
+ depResult.print();
+
+ Plan p = env.createProgramPlan();
+ OptimizedPlan op = compileNoStats(p);
+
+ assertEquals(1, op.getDataSinks().size());
+ assertTrue(op.getDataSinks().iterator().next().getInput().getSource() instanceof WorksetIterationPlanNode);
+
+ WorksetIterationPlanNode wipn = (WorksetIterationPlanNode) op.getDataSinks().iterator().next().getInput().getSource();
+
+ assertEquals(ShipStrategyType.PARTITION_HASH, wipn.getInput1().getShipStrategy());
+ assertTrue(wipn.getInput2().getTempMode().breaksPipeline());
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testTwoWorksetIterationsDirectlyChained() throws Exception {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(8);
+
+ DataSet<Tuple2<Long, Long>> verticesWithInitialId = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
+
+ DataSet<Tuple2<Long, Long>> edges = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
+
+ DataSet<Tuple2<Long, Long>> firstResult = doDeltaIteration(verticesWithInitialId, edges);
+
+ DataSet<Tuple2<Long, Long>> secondResult = doDeltaIteration(firstResult, edges);
+
+ secondResult.print();
+
+ Plan p = env.createProgramPlan();
+ OptimizedPlan op = compileNoStats(p);
+
+ assertEquals(1, op.getDataSinks().size());
+ assertTrue(op.getDataSinks().iterator().next().getInput().getSource() instanceof WorksetIterationPlanNode);
+
+ WorksetIterationPlanNode wipn = (WorksetIterationPlanNode) op.getDataSinks().iterator().next().getInput().getSource();
+
+ assertEquals(ShipStrategyType.FORWARD, wipn.getInput1().getShipStrategy());
+ assertTrue(wipn.getInput2().getTempMode().breaksPipeline());
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testIterationPushingWorkOut() throws Exception {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple2<Long, Long>> input1 = env.readCsvFile("/some/file/path").types(Long.class).map(new DuplicateValue());
+
+ DataSet<Tuple2<Long, Long>> input2 = env.readCsvFile("/some/file/path").types(Long.class, Long.class);
+
+ doBulkIteration(input1, input2).print();
+
+ Plan p = env.createProgramPlan();
+ OptimizedPlan op = compileNoStats(p);
+
+ System.out.println(new PlanJSONDumpGenerator().getOptimizerPlanAsJSON(op));
+
+ assertEquals(1, op.getDataSinks().size());
+ assertTrue(op.getDataSinks().iterator().next().getInput().getSource() instanceof BulkIterationPlanNode);
+
+ BulkIterationPlanNode bipn = (BulkIterationPlanNode) op.getDataSinks().iterator().next().getInput().getSource();
+
+ for (Channel c : bipn.getPartialSolutionPlanNode().getOutgoingChannels()) {
+ assertEquals(ShipStrategyType.PARTITION_HASH, c.getShipStrategy());
+ }
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ public static DataSet<Tuple2<Long, Long>> doBulkIteration(DataSet<Tuple2<Long, Long>> vertices, DataSet<Tuple2<Long, Long>> edges) {
+
+ // open a bulk iteration
+ IterativeDataSet<Tuple2<Long, Long>> iteration = vertices.iterate(100);
+
+ DataSet<Tuple2<Long, Long>> changes = iteration
+ .join(edges).where(0).equalTo(0).with(new Join222())
+ .groupBy(0).aggregate(Aggregations.MIN, 1)
+ .join(iteration).where(0).equalTo(0)
+ .flatMap(new FlatMapJoin());
+
+ // close the bulk iteration
+ return iteration.closeWith(changes);
+ }
+
+
+ public static DataSet<Tuple2<Long, Long>> doDeltaIteration(DataSet<Tuple2<Long, Long>> vertices, DataSet<Tuple2<Long, Long>> edges) {
+
+ DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> depIteration = vertices.iterateDelta(vertices, 100, 0);
+
+ DataSet<Tuple1<Long>> candidates = depIteration.getWorkset().join(edges).where(0).equalTo(0)
+ .projectSecond(1).types(Long.class);
+
+ DataSet<Tuple1<Long>> grouped = candidates.groupBy(0).reduceGroup(new Reduce101());
+
+ DataSet<Tuple2<Long, Long>> candidatesDependencies =
+ grouped.join(edges).where(0).equalTo(1).projectSecond(0, 1).types(Long.class, Long.class);
+
+ DataSet<Tuple2<Long, Long>> verticesWithNewComponents =
+ candidatesDependencies.join(depIteration.getSolutionSet()).where(0).equalTo(0)
+ .with(new Join222())
+ .groupBy(0).aggregate(Aggregations.MIN, 1);
+
+ DataSet<Tuple2<Long, Long>> updatedComponentId =
+ verticesWithNewComponents.join(depIteration.getSolutionSet()).where(0).equalTo(0)
+ .flatMap(new FlatMapJoin());
+
+ DataSet<Tuple2<Long, Long>> depResult = depIteration.closeWith(updatedComponentId, updatedComponentId);
+
+ return depResult;
+
+ }
+
+ public static final class Join222 extends JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
+
+ @Override
+ public Tuple2<Long, Long> join(Tuple2<Long, Long> vertexWithComponent, Tuple2<Long, Long> edge) {
+ return null;
+ }
+ }
+
+ public static final class FlatMapJoin extends FlatMapFunction<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>, Tuple2<Long, Long>> {
+
+ @Override
+ public void flatMap(Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> value, Collector<Tuple2<Long, Long>> out) {}
+ }
+
+ public static final class DummyMap extends MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+
+ @Override
+ public Tuple2<Long, Long> map(Tuple2<Long, Long> value) throws Exception {
+ return value;
+ }
+ }
+
+ @ConstantFields("0")
+ public static final class Reduce101 extends GroupReduceFunction<Tuple1<Long>, Tuple1<Long>> {
+
+ @Override
+ public void reduce(Iterator<Tuple1<Long>> values, Collector<Tuple1<Long>> out) {}
+ }
+
+ @ConstantFields("0")
+ public static final class DuplicateValue extends MapFunction<Tuple1<Long>, Tuple2<Long, Long>> {
+
+ @Override
+ public Tuple2<Long, Long> map(Tuple1<Long> value) throws Exception {
+ return new Tuple2<Long, Long>(value.f0, value.f0);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/MultipleIterationsCompilerTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/MultipleIterationsCompilerTest.java b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/MultipleIterationsCompilerTest.java
deleted file mode 100644
index 5cf1425..0000000
--- a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/MultipleIterationsCompilerTest.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/***********************************************************************************************************************
- *
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- *
- **********************************************************************************************************************/
-
-package eu.stratosphere.pact.compiler;
-
-import static org.junit.Assert.*;
-
-import java.util.Iterator;
-
-import org.junit.Test;
-
-import eu.stratosphere.api.common.Plan;
-import eu.stratosphere.api.java.DataSet;
-import eu.stratosphere.api.java.DeltaIteration;
-import eu.stratosphere.api.java.ExecutionEnvironment;
-import eu.stratosphere.api.java.IterativeDataSet;
-import eu.stratosphere.api.java.aggregation.Aggregations;
-import eu.stratosphere.api.java.functions.FlatMapFunction;
-import eu.stratosphere.api.java.functions.FunctionAnnotation.ConstantFields;
-import eu.stratosphere.api.java.functions.GroupReduceFunction;
-import eu.stratosphere.api.java.functions.JoinFunction;
-import eu.stratosphere.api.java.functions.MapFunction;
-import eu.stratosphere.api.java.tuple.Tuple1;
-import eu.stratosphere.api.java.tuple.Tuple2;
-import eu.stratosphere.compiler.plan.OptimizedPlan;
-import eu.stratosphere.compiler.plan.WorksetIterationPlanNode;
-import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
-import eu.stratosphere.util.Collector;
-
-
-@SuppressWarnings({"serial", "unchecked"})
-public class MultipleIterationsCompilerTest extends CompilerTestBase {
-
- @Test
- public void testTwoIterationsWithMapperInbetween() throws Exception {
- try {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(8);
-
- DataSet<Tuple2<Long, Long>> verticesWithInitialId = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
-
- DataSet<Tuple2<Long, Long>> edges = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
-
- DataSet<Tuple2<Long, Long>> bulkResult = doBulkIteration(verticesWithInitialId, edges);
-
- DataSet<Tuple2<Long, Long>> mappedBulk = bulkResult.map(new DummyMap());
-
- DataSet<Tuple2<Long, Long>> depResult = doDeltaIteration(mappedBulk, edges);
-
- depResult.print();
-
- Plan p = env.createProgramPlan();
- OptimizedPlan op = compileNoStats(p);
-
- assertEquals(1, op.getDataSinks().size());
- assertTrue(op.getDataSinks().iterator().next().getInput().getSource() instanceof WorksetIterationPlanNode);
-
- WorksetIterationPlanNode wipn = (WorksetIterationPlanNode) op.getDataSinks().iterator().next().getInput().getSource();
-
- assertEquals(ShipStrategyType.PARTITION_HASH, wipn.getInput1().getShipStrategy());
- assertTrue(wipn.getInput2().getTempMode().breaksPipeline());
- }
- catch (Exception e) {
- System.err.println(e.getMessage());
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testTwoIterationsDirectlyChained() throws Exception {
- try {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(8);
-
- DataSet<Tuple2<Long, Long>> verticesWithInitialId = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
-
- DataSet<Tuple2<Long, Long>> edges = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
-
- DataSet<Tuple2<Long, Long>> bulkResult = doBulkIteration(verticesWithInitialId, edges);
-
- DataSet<Tuple2<Long, Long>> depResult = doDeltaIteration(bulkResult, edges);
-
- depResult.print();
-
- Plan p = env.createProgramPlan();
- OptimizedPlan op = compileNoStats(p);
-
- assertEquals(1, op.getDataSinks().size());
- assertTrue(op.getDataSinks().iterator().next().getInput().getSource() instanceof WorksetIterationPlanNode);
-
- WorksetIterationPlanNode wipn = (WorksetIterationPlanNode) op.getDataSinks().iterator().next().getInput().getSource();
-
- assertEquals(ShipStrategyType.PARTITION_HASH, wipn.getInput1().getShipStrategy());
- assertTrue(wipn.getInput2().getTempMode().breaksPipeline());
- }
- catch (Exception e) {
- System.err.println(e.getMessage());
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testTwoWorksetIterationsDirectlyChained() throws Exception {
- try {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(8);
-
- DataSet<Tuple2<Long, Long>> verticesWithInitialId = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
-
- DataSet<Tuple2<Long, Long>> edges = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
-
- DataSet<Tuple2<Long, Long>> firstResult = doDeltaIteration(verticesWithInitialId, edges);
-
- DataSet<Tuple2<Long, Long>> secondResult = doDeltaIteration(firstResult, edges);
-
- secondResult.print();
-
- Plan p = env.createProgramPlan();
- OptimizedPlan op = compileNoStats(p);
-
- assertEquals(1, op.getDataSinks().size());
- assertTrue(op.getDataSinks().iterator().next().getInput().getSource() instanceof WorksetIterationPlanNode);
-
- WorksetIterationPlanNode wipn = (WorksetIterationPlanNode) op.getDataSinks().iterator().next().getInput().getSource();
-
- assertEquals(ShipStrategyType.FORWARD, wipn.getInput1().getShipStrategy());
- assertTrue(wipn.getInput2().getTempMode().breaksPipeline());
- }
- catch (Exception e) {
- System.err.println(e.getMessage());
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- public static DataSet<Tuple2<Long, Long>> doBulkIteration(DataSet<Tuple2<Long, Long>> vertices, DataSet<Tuple2<Long, Long>> edges) {
-
- // open a bulk iteration
- IterativeDataSet<Tuple2<Long, Long>> iteration = vertices.iterate(100);
-
- DataSet<Tuple2<Long, Long>> changes = iteration
- .join(edges).where(0).equalTo(0).with(new Join222())
- .groupBy(0).aggregate(Aggregations.MIN, 1)
- .join(iteration).where(0).equalTo(0)
- .flatMap(new FlatMapJoin());
-
- // close the bulk iteration
- return iteration.closeWith(changes);
- }
-
-
- public static DataSet<Tuple2<Long, Long>> doDeltaIteration(DataSet<Tuple2<Long, Long>> vertices, DataSet<Tuple2<Long, Long>> edges) {
-
- DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> depIteration = vertices.iterateDelta(vertices, 100, 0);
-
- DataSet<Tuple1<Long>> candidates = depIteration.getWorkset().join(edges).where(0).equalTo(0)
- .projectSecond(1).types(Long.class);
-
- DataSet<Tuple1<Long>> grouped = candidates.groupBy(0).reduceGroup(new Reduce101());
-
- DataSet<Tuple2<Long, Long>> candidatesDependencies =
- grouped.join(edges).where(0).equalTo(1).projectSecond(0, 1).types(Long.class, Long.class);
-
- DataSet<Tuple2<Long, Long>> verticesWithNewComponents =
- candidatesDependencies.join(depIteration.getSolutionSet()).where(0).equalTo(0)
- .with(new Join222())
- .groupBy(0).aggregate(Aggregations.MIN, 1);
-
- DataSet<Tuple2<Long, Long>> updatedComponentId =
- verticesWithNewComponents.join(depIteration.getSolutionSet()).where(0).equalTo(0)
- .flatMap(new FlatMapJoin());
-
- DataSet<Tuple2<Long, Long>> depResult = depIteration.closeWith(updatedComponentId, updatedComponentId);
-
- return depResult;
-
- }
-
- public static final class Join222 extends JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
-
- @Override
- public Tuple2<Long, Long> join(Tuple2<Long, Long> vertexWithComponent, Tuple2<Long, Long> edge) {
- return null;
- }
- }
-
- public static final class FlatMapJoin extends FlatMapFunction<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>, Tuple2<Long, Long>> {
-
- @Override
- public void flatMap(Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> value, Collector<Tuple2<Long, Long>> out) {}
- }
-
- public static final class DummyMap extends MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
-
- @Override
- public Tuple2<Long, Long> map(Tuple2<Long, Long> value) throws Exception {
- return value;
- }
- }
-
- @ConstantFields("0")
- public static final class Reduce101 extends GroupReduceFunction<Tuple1<Long>, Tuple1<Long>> {
-
- @Override
- public void reduce(Iterator<Tuple1<Long>> values, Collector<Tuple1<Long>> out) {}
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/convergence/WorksetEmptyConvergenceCriterion.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/convergence/WorksetEmptyConvergenceCriterion.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/convergence/WorksetEmptyConvergenceCriterion.java
index bb8af9c..c9fd442 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/convergence/WorksetEmptyConvergenceCriterion.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/convergence/WorksetEmptyConvergenceCriterion.java
@@ -24,6 +24,8 @@ import eu.stratosphere.types.LongValue;
*/
public class WorksetEmptyConvergenceCriterion implements ConvergenceCriterion<LongValue> {
+ private static final long serialVersionUID = 1L;
+
private static final Log log = LogFactory.getLog(WorksetEmptyConvergenceCriterion.class);
public static final String AGGREGATOR_NAME = "pact.runtime.workset-empty-aggregator";
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/OutputEmitter.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/OutputEmitter.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/OutputEmitter.java
index 6491749..638dd1d 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/OutputEmitter.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/OutputEmitter.java
@@ -78,7 +78,6 @@ public class OutputEmitter<T> implements ChannelSelector<SerializationDelegate<T
switch (strategy) {
case FORWARD:
case PARTITION_HASH:
- case PARTITION_LOCAL_HASH:
case PARTITION_RANGE:
case PARTITION_RANDOM:
case BROADCAST:
@@ -103,7 +102,6 @@ public class OutputEmitter<T> implements ChannelSelector<SerializationDelegate<T
case PARTITION_RANDOM:
return robin(numberOfChannels);
case PARTITION_HASH:
- case PARTITION_LOCAL_HASH:
return hashPartitionDefault(record.getInstance(), numberOfChannels);
case PARTITION_RANGE:
return rangePartition(record.getInstance(), numberOfChannels);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/RecordOutputEmitter.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/RecordOutputEmitter.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/RecordOutputEmitter.java
index 7fe35b4..efdb267 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/RecordOutputEmitter.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/RecordOutputEmitter.java
@@ -83,7 +83,6 @@ public class RecordOutputEmitter implements ChannelSelector<Record> {
switch (strategy) {
case FORWARD:
case PARTITION_HASH:
- case PARTITION_LOCAL_HASH:
case PARTITION_RANGE:
case PARTITION_RANDOM:
this.channels = new int[1];
@@ -110,7 +109,6 @@ public class RecordOutputEmitter implements ChannelSelector<Record> {
case PARTITION_RANDOM:
return robin(numberOfChannels);
case PARTITION_HASH:
- case PARTITION_LOCAL_HASH:
return hashPartitionDefault(record, numberOfChannels);
case PARTITION_RANGE:
return rangePartition(record, numberOfChannels);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/ShipStrategyType.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/ShipStrategyType.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/ShipStrategyType.java
index 2b50779..a86c209 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/ShipStrategyType.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/ShipStrategyType.java
@@ -40,12 +40,6 @@ public enum ShipStrategyType {
PARTITION_HASH(true, true, true),
/**
- * Repartitioning the data within a local instance with a hash function. Happens for example when the
- * intra-node degree-of-parallelism is increased.
- */
- PARTITION_LOCAL_HASH(false, true, true),
-
- /**
* Partitioning the data in ranges according to a total order.
*/
PARTITION_RANGE(true, true, true),
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-tests/src/test/java/eu/stratosphere/test/util/testjar/KMeansForTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/util/testjar/KMeansForTest.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/util/testjar/KMeansForTest.java
index c13ddc8..d1b249a 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/util/testjar/KMeansForTest.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/util/testjar/KMeansForTest.java
@@ -45,7 +45,7 @@ public class KMeansForTest implements Program{
}
// set up execution environment
- ExecutionEnvironment env = new RemoteEnvironment("localhost", 1, null);
+ ExecutionEnvironment env = new RemoteEnvironment("localhost", 1);
// get input data
DataSet<Point> points = getPointDataSet(env);