You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/06/17 02:09:16 UTC

[6/6] git commit: [FLINK-935] Fix compiler logic that pushed work out of the iteration loop.

[FLINK-935] Fix compiler logic that pushed work out of the iteration loop.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/31a37393
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/31a37393
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/31a37393

Branch: refs/heads/master
Commit: 31a373930bce27a0d4abc2cbcff4284d5af9002a
Parents: 229754d
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Jun 13 19:56:26 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jun 17 02:07:37 2014 +0200

----------------------------------------------------------------------
 .../compiler/costs/CostEstimator.java           |   2 -
 .../compiler/dag/BulkIterationNode.java         |  12 +-
 .../compiler/dag/BulkPartialSolutionNode.java   |   2 +-
 .../stratosphere/compiler/dag/DataSinkNode.java |   8 +-
 .../compiler/dag/SingleInputNode.java           |  11 +-
 .../stratosphere/compiler/dag/TwoInputNode.java |  16 +-
 .../compiler/dag/WorksetIterationNode.java      |   2 +-
 .../RequestedGlobalProperties.java              |  46 +---
 .../RequestedLocalProperties.java               |  14 +-
 .../eu/stratosphere/compiler/plan/Channel.java  |  76 +++---
 .../plandump/PlanJSONDumpGenerator.java         |   3 -
 .../plantranslate/NepheleJobGraphGenerator.java |   1 -
 .../pact/compiler/DOPChangeTest.java            |   4 +-
 .../pact/compiler/IterationsCompilerTest.java   | 265 +++++++++++++++++++
 .../MultipleIterationsCompilerTest.java         | 221 ----------------
 .../WorksetEmptyConvergenceCriterion.java       |   2 +
 .../pact/runtime/shipping/OutputEmitter.java    |   2 -
 .../runtime/shipping/RecordOutputEmitter.java   |   2 -
 .../pact/runtime/shipping/ShipStrategyType.java |   6 -
 .../test/util/testjar/KMeansForTest.java        |   2 +-
 20 files changed, 345 insertions(+), 352 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/CostEstimator.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/CostEstimator.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/CostEstimator.java
index 492204b..11fb45b 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/CostEstimator.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/CostEstimator.java
@@ -94,8 +94,6 @@ public abstract class CostEstimator {
 			case FORWARD:
 //				costs.addHeuristicNetworkCost(channel.getMaxDepth());
 				break;
-			case PARTITION_LOCAL_HASH:
-				break;
 			case PARTITION_RANDOM:
 				addRandomPartitioningCost(channel, costs);
 				break;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkIterationNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkIterationNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkIterationNode.java
index c547da9..b60f427 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkIterationNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkIterationNode.java
@@ -279,13 +279,17 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode
 		// 3) Get the alternative plans
 		List<PlanNode> candidates = this.nextPartialSolution.getAlternativePlans(estimator);
 		
-		// 4) Throw away all that are not compatible with the properties currently requested to the
-		//    initial partial solution
+		// 4) Make sure that the beginning of the step function does not assume properties that 
+		//    are not also produced by the end of the step function.
+
 		for (Iterator<PlanNode> planDeleter = candidates.iterator(); planDeleter.hasNext(); ) {
 			PlanNode candidate = planDeleter.next();
-			if (!(globPropsReq.isMetBy(candidate.getGlobalProperties()) && locPropsReq.isMetBy(candidate.getLocalProperties()))) {
-				planDeleter.remove();
+			
+			// quick-check if the properties at the end of the step function are the same as at the beginning
+			if (candidate.getGlobalProperties().equals(pspn.getGlobalProperties()) && candidate.getLocalProperties().equals(pspn.getLocalProperties())) {
+				continue;
 			}
+			planDeleter.remove();
 		}
 		
 		// 5) Create a candidate for the Iteration Node for every remaining plan of the step function.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkPartialSolutionNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkPartialSolutionNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkPartialSolutionNode.java
index 449a2fc..592768b 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkPartialSolutionNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkPartialSolutionNode.java
@@ -43,7 +43,7 @@ public class BulkPartialSolutionNode extends AbstractPartialSolutionNode {
 		if (this.cachedPlans != null) {
 			throw new IllegalStateException();
 		} else {
-			this.cachedPlans = Collections.<PlanNode>singletonList(new BulkPartialSolutionPlanNode(this, "BulkPartialSolution("+this.getPactContract().getName()+")", gProps, lProps, initialInput));
+			this.cachedPlans = Collections.<PlanNode>singletonList(new BulkPartialSolutionPlanNode(this, "PartialSolution ("+this.getPactContract().getName()+")", gProps, lProps, initialInput));
 		}
 	}
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSinkNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSinkNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSinkNode.java
index 3f7b224..15c7670 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSinkNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSinkNode.java
@@ -31,7 +31,6 @@ import eu.stratosphere.compiler.dataproperties.RequestedLocalProperties;
 import eu.stratosphere.compiler.plan.Channel;
 import eu.stratosphere.compiler.plan.PlanNode;
 import eu.stratosphere.compiler.plan.SinkPlanNode;
-import eu.stratosphere.pact.runtime.task.util.LocalStrategy;
 import eu.stratosphere.util.Visitor;
 
 /**
@@ -210,12 +209,7 @@ public class DataSinkNode extends OptimizerNode {
 				for (RequestedLocalProperties lp : ips.getLocalProperties()) {
 					Channel c = new Channel(p);
 					gp.parameterizeChannel(c, globalDopChange, localDopChange);
-
-					if (lp.isMetBy(c.getLocalPropertiesAfterShippingOnly())) {
-						c.setLocalStrategy(LocalStrategy.NONE);
-					} else {
-						lp.parameterizeChannel(c);
-					}
+					lp.parameterizeChannel(c);
 					
 					// no need to check whether the created properties meet what we need in case
 					// of ordering or global ordering, because the only interesting properties we have

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java
index 2a9e5c6..fd4b76a 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java
@@ -42,7 +42,6 @@ import eu.stratosphere.compiler.plan.SingleInputPlanNode;
 import eu.stratosphere.compiler.util.NoOpUnaryUdfOp;
 import eu.stratosphere.configuration.Configuration;
 import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
-import eu.stratosphere.pact.runtime.task.util.LocalStrategy;
 import eu.stratosphere.util.Visitor;
 
 /**
@@ -357,21 +356,17 @@ public abstract class SingleInputNode extends OptimizerNode {
 	protected void addLocalCandidates(Channel template, List<Set<? extends NamedChannel>> broadcastPlanChannels, RequestedGlobalProperties rgps,
 			List<PlanNode> target, CostEstimator estimator)
 	{
-		final LocalProperties lp = template.getLocalPropertiesAfterShippingOnly();
 		for (RequestedLocalProperties ilp : this.inConn.getInterestingProperties().getLocalProperties()) {
 			final Channel in = template.clone();
-			if (ilp.isMetBy(lp)) {
-				in.setLocalStrategy(LocalStrategy.NONE);
-			} else {
-				ilp.parameterizeChannel(in);
-			}
+			ilp.parameterizeChannel(in);
 			
 			// instantiate a candidate, if the instantiated local properties meet one possible local property set
+			outer:
 			for (OperatorDescriptorSingle dps: getPossibleProperties()) {
 				for (RequestedLocalProperties ilps : dps.getPossibleLocalProperties()) {
 					if (ilps.isMetBy(in.getLocalProperties())) {
 						instantiateCandidate(dps, in, broadcastPlanChannels, target, estimator, rgps, ilp);
-						break;
+						break outer;
 					}
 				}
 			}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java
index 5046ceb..cccbeba 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java
@@ -51,7 +51,6 @@ import eu.stratosphere.configuration.Configuration;
 import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
 import eu.stratosphere.pact.runtime.task.DamBehavior;
 import eu.stratosphere.pact.runtime.task.DriverStrategy;
-import eu.stratosphere.pact.runtime.task.util.LocalStrategy;
 import eu.stratosphere.util.Visitor;
 
 /**
@@ -488,24 +487,13 @@ public abstract class TwoInputNode extends OptimizerNode {
 			RequestedGlobalProperties rgps1, RequestedGlobalProperties rgps2,
 			List<PlanNode> target, LocalPropertiesPair[] validLocalCombinations, CostEstimator estimator)
 	{
-		final LocalProperties lp1 = template1.getLocalPropertiesAfterShippingOnly();
-		final LocalProperties lp2 = template2.getLocalPropertiesAfterShippingOnly();
-		
 		for (RequestedLocalProperties ilp1 : this.input1.getInterestingProperties().getLocalProperties()) {
 			final Channel in1 = template1.clone();
-			if (ilp1.isMetBy(lp1)) {
-				in1.setLocalStrategy(LocalStrategy.NONE);
-			} else {
-				ilp1.parameterizeChannel(in1);
-			}
+			ilp1.parameterizeChannel(in1);
 			
 			for (RequestedLocalProperties ilp2 : this.input2.getInterestingProperties().getLocalProperties()) {
 				final Channel in2 = template2.clone();
-				if (ilp2.isMetBy(lp2)) {
-					in2.setLocalStrategy(LocalStrategy.NONE);
-				} else {
-					ilp2.parameterizeChannel(in2);
-				}
+				ilp2.parameterizeChannel(in2);
 				
 				allPossibleLoop:
 				for (OperatorDescriptorDual dps: this.possibleProperties) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java
index be02e01..94580ad 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java
@@ -397,7 +397,7 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode
 		List<UnclosedBranchDescriptor> result2 = getSecondPredecessorNode().getBranchesForParent(getSecondIncomingConnection());
 
 		ArrayList<UnclosedBranchDescriptor> inputsMerged1 = new ArrayList<UnclosedBranchDescriptor>();
-		mergeLists(result1, result2, inputsMerged1);
+		mergeLists(result1, result2, inputsMerged1); // this method also sets which branches are joined here (in the head)
 		
 		addClosedBranches(getSingleRootOfStepFunction().closedBranchingNodes);
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedGlobalProperties.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedGlobalProperties.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedGlobalProperties.java
index 06d7371..3935f5e 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedGlobalProperties.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedGlobalProperties.java
@@ -17,8 +17,6 @@ import eu.stratosphere.api.common.distributions.DataDistribution;
 import eu.stratosphere.api.common.operators.Ordering;
 import eu.stratosphere.api.common.operators.util.FieldSet;
 import eu.stratosphere.compiler.CompilerException;
-import eu.stratosphere.compiler.costs.CostEstimator;
-import eu.stratosphere.compiler.costs.Costs;
 import eu.stratosphere.compiler.dag.OptimizerNode;
 import eu.stratosphere.compiler.plan.Channel;
 import eu.stratosphere.compiler.util.Utils;
@@ -31,8 +29,8 @@ import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
  * Currently, the properties are the following: A partitioning type (ANY, HASH, RANGE), and EITHER an ordering (for range partitioning)
  * or an FieldSet with the hash partitioning columns.
  */
-public final class RequestedGlobalProperties implements Cloneable
-{
+public final class RequestedGlobalProperties implements Cloneable {
+	
 	private PartitioningProperty partitioning;	// the type partitioning
 	
 	private FieldSet partitioningFields;		// the fields which are partitioned
@@ -230,20 +228,12 @@ public final class RequestedGlobalProperties implements Cloneable
 		
 		final GlobalProperties inGlobals = channel.getSource().getGlobalProperties();
 		// if we have no global parallelism change, check if we have already compatible global properties
-		if (!globalDopChange && isMetBy(inGlobals)) {
-			if (localDopChange) {
-				// if the local degree of parallelism changes, we need to adjust
-				if (inGlobals.getPartitioning() == PartitioningProperty.HASH_PARTITIONED) {
-					// to preserve the hash partitioning, we need to locally hash re-partition
-					channel.setShipStrategy(ShipStrategyType.PARTITION_LOCAL_HASH, inGlobals.getPartitioningFields());
-					return;
-				}
-				// else fall though
-			} else {
-				// we meet already everything, so go forward
-				channel.setShipStrategy(ShipStrategyType.FORWARD);
-				return;
-			}
+		if (!globalDopChange && !localDopChange && isMetBy(inGlobals)) {
+			channel.setRequiredGlobalProps(this);
+			
+			// we meet already everything, so go forward
+			channel.setShipStrategy(ShipStrategyType.FORWARD);
+			return;
 		}
 		
 		// if we fall through the conditions until here, we need to re-establish
@@ -266,26 +256,6 @@ public final class RequestedGlobalProperties implements Cloneable
 				throw new CompilerException();
 		}
 	}
-	
-	public void addMinimalRequiredCosts(Costs to, CostEstimator estimator, OptimizerNode source, OptimizerNode target) {
-		if (this.partitioning == null || this.partitioning == PartitioningProperty.RANDOM) {
-			return;
-		} else {
-			switch (this.partitioning) {
-				case FULL_REPLICATION:
-					estimator.addBroadcastCost(source, target.getDegreeOfParallelism(), to);
-				case ANY_PARTITIONING:
-				case HASH_PARTITIONED:
-					estimator.addHashPartitioningCost(source, to);
-					break;
-				case RANGE_PARTITIONED:
-					estimator.addRangePartitionCost(source, to);
-					break;
-				default:
-					throw new CompilerException();
-			}
-		}
-	}
 
 	// ------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedLocalProperties.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedLocalProperties.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedLocalProperties.java
index 7e67f5f..e714cea 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedLocalProperties.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedLocalProperties.java
@@ -187,13 +187,23 @@ public class RequestedLocalProperties implements Cloneable {
 	 * @param channel The channel to parameterize.
 	 */
 	public void parameterizeChannel(Channel channel) {
-		if (this.ordering != null) {
+		LocalProperties current = channel.getLocalProperties();
+		
+		if (isMetBy(current)) {
+			// we are met. record that this is needed.
+			channel.setRequiredLocalProps(this);
+		}
+		else if (this.ordering != null) {
 			channel.setLocalStrategy(LocalStrategy.SORT, this.ordering.getInvolvedIndexes(), this.ordering.getFieldSortDirections());
-		} else if (this.groupedFields != null) {
+		}
+		else if (this.groupedFields != null) {
 			boolean[] dirs = new boolean[this.groupedFields.size()];
 			Arrays.fill(dirs, true);
 			channel.setLocalStrategy(LocalStrategy.SORT, Utils.createOrderedFromSet(this.groupedFields), dirs);
 		}
+		else {
+			channel.setLocalStrategy(LocalStrategy.NONE);
+		}
 	}
 	
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/Channel.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/Channel.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/Channel.java
index 3377caf..1cd6a36 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/Channel.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/Channel.java
@@ -22,6 +22,8 @@ import eu.stratosphere.compiler.dag.EstimateProvider;
 import eu.stratosphere.compiler.dag.TempMode;
 import eu.stratosphere.compiler.dataproperties.GlobalProperties;
 import eu.stratosphere.compiler.dataproperties.LocalProperties;
+import eu.stratosphere.compiler.dataproperties.RequestedGlobalProperties;
+import eu.stratosphere.compiler.dataproperties.RequestedLocalProperties;
 import eu.stratosphere.compiler.plandump.DumpableConnection;
 import eu.stratosphere.compiler.util.Utils;
 import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
@@ -48,6 +50,10 @@ public class Channel implements EstimateProvider, Cloneable, DumpableConnection<
 	
 	private boolean[] localSortOrder;
 	
+	private RequestedGlobalProperties requiredGlobalProps;
+	
+	private RequestedLocalProperties requiredLocalProps;
+	
 	private GlobalProperties globalProps;
 	
 	private LocalProperties localProps;
@@ -329,6 +335,22 @@ public class Channel implements EstimateProvider, Cloneable, DumpableConnection<
 	// --------------------------------------------------------------------------------------------
 	
 
+	public RequestedGlobalProperties getRequiredGlobalProps() {
+		return requiredGlobalProps;
+	}
+
+	public void setRequiredGlobalProps(RequestedGlobalProperties requiredGlobalProps) {
+		this.requiredGlobalProps = requiredGlobalProps;
+	}
+
+	public RequestedLocalProperties getRequiredLocalProps() {
+		return requiredLocalProps;
+	}
+
+	public void setRequiredLocalProps(RequestedLocalProperties requiredLocalProps) {
+		this.requiredLocalProps = requiredLocalProps;
+	}
+
 	public GlobalProperties getGlobalProperties() {
 		if (this.globalProps == null) {
 			this.globalProps = this.source.getGlobalProperties().clone();
@@ -348,19 +370,6 @@ public class Channel implements EstimateProvider, Cloneable, DumpableConnection<
 				case PARTITION_RANDOM:
 					this.globalProps.reset();
 					break;
-				case PARTITION_LOCAL_HASH:
-					if (getSource().getGlobalProperties().isPartitionedOnFields(this.shipKeys)) {
-						// after a local hash partitioning, we can only state that the data is somehow
-						// partitioned. even if we had a hash partitioning before over 8 partitions,
-						// locally rehashing that onto 16 partitions (each one partition into two) gives you
-						// a different result than directly hashing to 16 partitions. the hash-partitioning
-						// property is only valid, if the assumed built in hash function is directly used.
-						// hence, we can only state that this is some form of partitioning.
-						this.globalProps.setAnyPartitioning(this.shipKeys);
-					} else {
-						this.globalProps.reset();
-					}
-					break;
 				case NONE:
 					throw new CompilerException("Cannot produce GlobalProperties before ship strategy is set.");
 			}
@@ -371,12 +380,13 @@ public class Channel implements EstimateProvider, Cloneable, DumpableConnection<
 	
 	public LocalProperties getLocalProperties() {
 		if (this.localProps == null) {
-			this.localProps = getLocalPropertiesAfterShippingOnly().clone();
+			computeLocalPropertiesAfterShippingOnly();
 			switch (this.localStrategy) {
 				case NONE:
 					break;
 				case SORT:
 				case COMBININGSORT:
+					this.localProps = new LocalProperties();
 					this.localProps.setOrdering(Utils.createOrdering(this.localKeys, this.localSortOrder));
 					break;
 				default:
@@ -387,25 +397,19 @@ public class Channel implements EstimateProvider, Cloneable, DumpableConnection<
 		return this.localProps;
 	}
 	
-	public LocalProperties getLocalPropertiesAfterShippingOnly() {
-		if (this.shipStrategy == ShipStrategyType.FORWARD) {
-			return this.source.getLocalProperties();
-		} else {
-			final LocalProperties props = this.source.getLocalProperties().clone();
-			switch (this.shipStrategy) {
-				case BROADCAST:
-				case PARTITION_HASH:
-				case PARTITION_RANGE:
-				case PARTITION_RANDOM:
-					props.reset();
-					break;
-				case PARTITION_LOCAL_HASH:
-				case FORWARD:
-					break;
-				case NONE:
-					throw new CompilerException("ShipStrategy has not yet been set.");
-			}
-			return props;
+	private void computeLocalPropertiesAfterShippingOnly() {
+		switch (this.shipStrategy) {
+			case BROADCAST:
+			case PARTITION_HASH:
+			case PARTITION_RANGE:
+			case PARTITION_RANDOM:
+				this.localProps = new LocalProperties();
+				break;
+			case FORWARD:
+				this.localProps = this.source.getLocalProperties();
+				break;
+			case NONE:
+				throw new CompilerException("ShipStrategy has not yet been set.");
 		}
 	}
 	
@@ -423,8 +427,7 @@ public class Channel implements EstimateProvider, Cloneable, DumpableConnection<
 		// some strategies globally reestablish properties
 		switch (this.shipStrategy) {
 		case FORWARD:
-		case PARTITION_LOCAL_HASH:
-			throw new CompilerException("Cannot use FORWARD or LOCAL_HASH strategy between operations " +
+			throw new CompilerException("Cannot use FORWARD strategy between operations " +
 					"with different number of parallel instances.");
 		case NONE: // excluded by sanity check. lust here for verification check completion
 		case BROADCAST:
@@ -452,8 +455,7 @@ public class Channel implements EstimateProvider, Cloneable, DumpableConnection<
 		case FORWARD:
 			this.globalProps.reset();
 			return;
-		case NONE: // excluded by sanity check. lust here for verification check completion
-		case PARTITION_LOCAL_HASH:
+		case NONE: // excluded by sanity check. just here to silence compiler warnings check completion
 		case BROADCAST:
 		case PARTITION_HASH:
 		case PARTITION_RANGE:

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plandump/PlanJSONDumpGenerator.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plandump/PlanJSONDumpGenerator.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plandump/PlanJSONDumpGenerator.java
index 183ee2f..e89a18b 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plandump/PlanJSONDumpGenerator.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plandump/PlanJSONDumpGenerator.java
@@ -319,9 +319,6 @@ public class PlanJSONDumpGenerator {
 						case PARTITION_RANGE:
 							shipStrategy = "Range Partition";
 							break;
-						case PARTITION_LOCAL_HASH:
-							shipStrategy = "Hash Partition (local)";
-							break;
 						case PARTITION_RANDOM:
 							shipStrategy = "Redistribute";
 							break;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java
index 4817095..aebf2cf 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java
@@ -1041,7 +1041,6 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 
 		switch (channel.getShipStrategy()) {
 			case FORWARD:
-			case PARTITION_LOCAL_HASH:
 				distributionPattern = DistributionPattern.POINTWISE;
 				channelType = ChannelType.NETWORK;
 				break;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/DOPChangeTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/DOPChangeTest.java b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/DOPChangeTest.java
index 9f53c30..273c42c 100644
--- a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/DOPChangeTest.java
+++ b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/DOPChangeTest.java
@@ -209,8 +209,8 @@ public class DOPChangeTest extends CompilerTestBase {
 		ShipStrategyType mapIn = map2Node.getInput().getShipStrategy();
 		ShipStrategyType reduceIn = red2Node.getInput().getShipStrategy();
 		
-		Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.PARTITION_LOCAL_HASH, mapIn);
-		Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, reduceIn);
+		Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, mapIn);
+		Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.PARTITION_HASH, reduceIn);
 	}
 	
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/IterationsCompilerTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/IterationsCompilerTest.java b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/IterationsCompilerTest.java
new file mode 100644
index 0000000..a123099
--- /dev/null
+++ b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/IterationsCompilerTest.java
@@ -0,0 +1,265 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.pact.compiler;
+
+import static org.junit.Assert.*;
+
+import java.util.Iterator;
+
+import org.junit.Test;
+
+import eu.stratosphere.api.common.Plan;
+import eu.stratosphere.api.java.DataSet;
+import eu.stratosphere.api.java.DeltaIteration;
+import eu.stratosphere.api.java.ExecutionEnvironment;
+import eu.stratosphere.api.java.IterativeDataSet;
+import eu.stratosphere.api.java.aggregation.Aggregations;
+import eu.stratosphere.api.java.functions.FlatMapFunction;
+import eu.stratosphere.api.java.functions.FunctionAnnotation.ConstantFields;
+import eu.stratosphere.api.java.functions.GroupReduceFunction;
+import eu.stratosphere.api.java.functions.JoinFunction;
+import eu.stratosphere.api.java.functions.MapFunction;
+import eu.stratosphere.api.java.tuple.Tuple1;
+import eu.stratosphere.api.java.tuple.Tuple2;
+import eu.stratosphere.compiler.plan.BulkIterationPlanNode;
+import eu.stratosphere.compiler.plan.Channel;
+import eu.stratosphere.compiler.plan.OptimizedPlan;
+import eu.stratosphere.compiler.plan.WorksetIterationPlanNode;
+import eu.stratosphere.compiler.plandump.PlanJSONDumpGenerator;
+import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
+import eu.stratosphere.util.Collector;
+
+
+@SuppressWarnings({"serial", "unchecked"})
+public class IterationsCompilerTest extends CompilerTestBase {
+
+	@Test
+	public void testTwoIterationsWithMapperInbetween() throws Exception {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setDegreeOfParallelism(8);
+			
+			DataSet<Tuple2<Long, Long>> verticesWithInitialId = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
+			
+			DataSet<Tuple2<Long, Long>> edges = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
+			
+			DataSet<Tuple2<Long, Long>> bulkResult = doBulkIteration(verticesWithInitialId, edges);
+			
+			DataSet<Tuple2<Long, Long>> mappedBulk = bulkResult.map(new DummyMap());
+			
+			DataSet<Tuple2<Long, Long>> depResult = doDeltaIteration(mappedBulk, edges);
+			
+			depResult.print();
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			assertEquals(1, op.getDataSinks().size());
+			assertTrue(op.getDataSinks().iterator().next().getInput().getSource() instanceof WorksetIterationPlanNode);
+			
+			WorksetIterationPlanNode wipn = (WorksetIterationPlanNode) op.getDataSinks().iterator().next().getInput().getSource();
+			
+			assertEquals(ShipStrategyType.PARTITION_HASH, wipn.getInput1().getShipStrategy());
+			assertTrue(wipn.getInput2().getTempMode().breaksPipeline());
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testTwoIterationsDirectlyChained() throws Exception {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setDegreeOfParallelism(8);
+			
+			DataSet<Tuple2<Long, Long>> verticesWithInitialId = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
+			
+			DataSet<Tuple2<Long, Long>> edges = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
+			
+			DataSet<Tuple2<Long, Long>> bulkResult = doBulkIteration(verticesWithInitialId, edges);
+			
+			DataSet<Tuple2<Long, Long>> depResult = doDeltaIteration(bulkResult, edges);
+			
+			depResult.print();
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			assertEquals(1, op.getDataSinks().size());
+			assertTrue(op.getDataSinks().iterator().next().getInput().getSource() instanceof WorksetIterationPlanNode);
+			
+			WorksetIterationPlanNode wipn = (WorksetIterationPlanNode) op.getDataSinks().iterator().next().getInput().getSource();
+			
+			assertEquals(ShipStrategyType.PARTITION_HASH, wipn.getInput1().getShipStrategy());
+			assertTrue(wipn.getInput2().getTempMode().breaksPipeline());
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testTwoWorksetIterationsDirectlyChained() throws Exception {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setDegreeOfParallelism(8);
+			
+			DataSet<Tuple2<Long, Long>> verticesWithInitialId = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
+			
+			DataSet<Tuple2<Long, Long>> edges = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
+			
+			DataSet<Tuple2<Long, Long>> firstResult = doDeltaIteration(verticesWithInitialId, edges);
+			
+			DataSet<Tuple2<Long, Long>> secondResult = doDeltaIteration(firstResult, edges);
+			
+			secondResult.print();
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			assertEquals(1, op.getDataSinks().size());
+			assertTrue(op.getDataSinks().iterator().next().getInput().getSource() instanceof WorksetIterationPlanNode);
+			
+			WorksetIterationPlanNode wipn = (WorksetIterationPlanNode) op.getDataSinks().iterator().next().getInput().getSource();
+			
+			assertEquals(ShipStrategyType.FORWARD, wipn.getInput1().getShipStrategy());
+			assertTrue(wipn.getInput2().getTempMode().breaksPipeline());
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testIterationPushingWorkOut() throws Exception {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Tuple2<Long, Long>> input1 = env.readCsvFile("/some/file/path").types(Long.class).map(new DuplicateValue());
+			
+			DataSet<Tuple2<Long, Long>> input2 = env.readCsvFile("/some/file/path").types(Long.class, Long.class);
+			
+			doBulkIteration(input1, input2).print();
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			System.out.println(new PlanJSONDumpGenerator().getOptimizerPlanAsJSON(op));
+			
+			assertEquals(1, op.getDataSinks().size());
+			assertTrue(op.getDataSinks().iterator().next().getInput().getSource() instanceof BulkIterationPlanNode);
+			
+			BulkIterationPlanNode bipn = (BulkIterationPlanNode) op.getDataSinks().iterator().next().getInput().getSource();
+			
+			for (Channel c : bipn.getPartialSolutionPlanNode().getOutgoingChannels()) {
+				assertEquals(ShipStrategyType.PARTITION_HASH, c.getShipStrategy());
+			}
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	public static DataSet<Tuple2<Long, Long>> doBulkIteration(DataSet<Tuple2<Long, Long>> vertices, DataSet<Tuple2<Long, Long>> edges) {
+		
+		// open a bulk iteration
+		IterativeDataSet<Tuple2<Long, Long>> iteration = vertices.iterate(100);
+		
+		DataSet<Tuple2<Long, Long>> changes = iteration
+				.join(edges).where(0).equalTo(0).with(new Join222())
+				.groupBy(0).aggregate(Aggregations.MIN, 1)
+				.join(iteration).where(0).equalTo(0)
+				.flatMap(new FlatMapJoin());
+		
+		// close the bulk iteration
+		return iteration.closeWith(changes);
+	}
+				
+		
+	public static DataSet<Tuple2<Long, Long>> doDeltaIteration(DataSet<Tuple2<Long, Long>> vertices, DataSet<Tuple2<Long, Long>> edges) {
+
+		DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> depIteration = vertices.iterateDelta(vertices, 100, 0);
+				
+		DataSet<Tuple1<Long>> candidates = depIteration.getWorkset().join(edges).where(0).equalTo(0)
+				.projectSecond(1).types(Long.class);
+		
+		DataSet<Tuple1<Long>> grouped = candidates.groupBy(0).reduceGroup(new Reduce101());
+		
+		DataSet<Tuple2<Long, Long>> candidatesDependencies = 
+				grouped.join(edges).where(0).equalTo(1).projectSecond(0, 1).types(Long.class, Long.class);
+		
+		DataSet<Tuple2<Long, Long>> verticesWithNewComponents = 
+				candidatesDependencies.join(depIteration.getSolutionSet()).where(0).equalTo(0)
+				.with(new Join222())
+				.groupBy(0).aggregate(Aggregations.MIN, 1);
+		
+		DataSet<Tuple2<Long, Long>> updatedComponentId = 
+				verticesWithNewComponents.join(depIteration.getSolutionSet()).where(0).equalTo(0)
+				.flatMap(new FlatMapJoin());
+		
+		DataSet<Tuple2<Long, Long>> depResult = depIteration.closeWith(updatedComponentId, updatedComponentId);
+		
+		return depResult;
+		
+	}
+	
+	public static final class Join222 extends JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
+
+		@Override
+		public Tuple2<Long, Long> join(Tuple2<Long, Long> vertexWithComponent, Tuple2<Long, Long> edge) {
+			return null;
+		}
+	}
+	
+	public static final class FlatMapJoin extends FlatMapFunction<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>, Tuple2<Long, Long>> {
+		
+		@Override
+		public void flatMap(Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> value, Collector<Tuple2<Long, Long>> out) {}
+	}
+	
+	public static final class DummyMap extends MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+
+		@Override
+		public Tuple2<Long, Long> map(Tuple2<Long, Long> value) throws Exception {
+			return value;
+		}
+	}
+	
+	@ConstantFields("0")
+	public static final class Reduce101 extends GroupReduceFunction<Tuple1<Long>, Tuple1<Long>> {
+		
+		@Override
+		public void reduce(Iterator<Tuple1<Long>> values, Collector<Tuple1<Long>> out) {}
+	}
+	
+	@ConstantFields("0")
+	public static final class DuplicateValue extends MapFunction<Tuple1<Long>, Tuple2<Long, Long>> {
+
+		@Override
+		public Tuple2<Long, Long> map(Tuple1<Long> value) throws Exception {
+			return new Tuple2<Long, Long>(value.f0, value.f0);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/MultipleIterationsCompilerTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/MultipleIterationsCompilerTest.java b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/MultipleIterationsCompilerTest.java
deleted file mode 100644
index 5cf1425..0000000
--- a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/MultipleIterationsCompilerTest.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/***********************************************************************************************************************
- *
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- *
- **********************************************************************************************************************/
-
-package eu.stratosphere.pact.compiler;
-
-import static org.junit.Assert.*;
-
-import java.util.Iterator;
-
-import org.junit.Test;
-
-import eu.stratosphere.api.common.Plan;
-import eu.stratosphere.api.java.DataSet;
-import eu.stratosphere.api.java.DeltaIteration;
-import eu.stratosphere.api.java.ExecutionEnvironment;
-import eu.stratosphere.api.java.IterativeDataSet;
-import eu.stratosphere.api.java.aggregation.Aggregations;
-import eu.stratosphere.api.java.functions.FlatMapFunction;
-import eu.stratosphere.api.java.functions.FunctionAnnotation.ConstantFields;
-import eu.stratosphere.api.java.functions.GroupReduceFunction;
-import eu.stratosphere.api.java.functions.JoinFunction;
-import eu.stratosphere.api.java.functions.MapFunction;
-import eu.stratosphere.api.java.tuple.Tuple1;
-import eu.stratosphere.api.java.tuple.Tuple2;
-import eu.stratosphere.compiler.plan.OptimizedPlan;
-import eu.stratosphere.compiler.plan.WorksetIterationPlanNode;
-import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
-import eu.stratosphere.util.Collector;
-
-
-@SuppressWarnings({"serial", "unchecked"})
-public class MultipleIterationsCompilerTest extends CompilerTestBase {
-
-	@Test
-	public void testTwoIterationsWithMapperInbetween() throws Exception {
-		try {
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(8);
-			
-			DataSet<Tuple2<Long, Long>> verticesWithInitialId = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
-			
-			DataSet<Tuple2<Long, Long>> edges = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
-			
-			DataSet<Tuple2<Long, Long>> bulkResult = doBulkIteration(verticesWithInitialId, edges);
-			
-			DataSet<Tuple2<Long, Long>> mappedBulk = bulkResult.map(new DummyMap());
-			
-			DataSet<Tuple2<Long, Long>> depResult = doDeltaIteration(mappedBulk, edges);
-			
-			depResult.print();
-			
-			Plan p = env.createProgramPlan();
-			OptimizedPlan op = compileNoStats(p);
-			
-			assertEquals(1, op.getDataSinks().size());
-			assertTrue(op.getDataSinks().iterator().next().getInput().getSource() instanceof WorksetIterationPlanNode);
-			
-			WorksetIterationPlanNode wipn = (WorksetIterationPlanNode) op.getDataSinks().iterator().next().getInput().getSource();
-			
-			assertEquals(ShipStrategyType.PARTITION_HASH, wipn.getInput1().getShipStrategy());
-			assertTrue(wipn.getInput2().getTempMode().breaksPipeline());
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testTwoIterationsDirectlyChained() throws Exception {
-		try {
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(8);
-			
-			DataSet<Tuple2<Long, Long>> verticesWithInitialId = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
-			
-			DataSet<Tuple2<Long, Long>> edges = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
-			
-			DataSet<Tuple2<Long, Long>> bulkResult = doBulkIteration(verticesWithInitialId, edges);
-			
-			DataSet<Tuple2<Long, Long>> depResult = doDeltaIteration(bulkResult, edges);
-			
-			depResult.print();
-			
-			Plan p = env.createProgramPlan();
-			OptimizedPlan op = compileNoStats(p);
-			
-			assertEquals(1, op.getDataSinks().size());
-			assertTrue(op.getDataSinks().iterator().next().getInput().getSource() instanceof WorksetIterationPlanNode);
-			
-			WorksetIterationPlanNode wipn = (WorksetIterationPlanNode) op.getDataSinks().iterator().next().getInput().getSource();
-			
-			assertEquals(ShipStrategyType.PARTITION_HASH, wipn.getInput1().getShipStrategy());
-			assertTrue(wipn.getInput2().getTempMode().breaksPipeline());
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testTwoWorksetIterationsDirectlyChained() throws Exception {
-		try {
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(8);
-			
-			DataSet<Tuple2<Long, Long>> verticesWithInitialId = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
-			
-			DataSet<Tuple2<Long, Long>> edges = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
-			
-			DataSet<Tuple2<Long, Long>> firstResult = doDeltaIteration(verticesWithInitialId, edges);
-			
-			DataSet<Tuple2<Long, Long>> secondResult = doDeltaIteration(firstResult, edges);
-			
-			secondResult.print();
-			
-			Plan p = env.createProgramPlan();
-			OptimizedPlan op = compileNoStats(p);
-			
-			assertEquals(1, op.getDataSinks().size());
-			assertTrue(op.getDataSinks().iterator().next().getInput().getSource() instanceof WorksetIterationPlanNode);
-			
-			WorksetIterationPlanNode wipn = (WorksetIterationPlanNode) op.getDataSinks().iterator().next().getInput().getSource();
-			
-			assertEquals(ShipStrategyType.FORWARD, wipn.getInput1().getShipStrategy());
-			assertTrue(wipn.getInput2().getTempMode().breaksPipeline());
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	public static DataSet<Tuple2<Long, Long>> doBulkIteration(DataSet<Tuple2<Long, Long>> vertices, DataSet<Tuple2<Long, Long>> edges) {
-		
-		// open a bulk iteration
-		IterativeDataSet<Tuple2<Long, Long>> iteration = vertices.iterate(100);
-		
-		DataSet<Tuple2<Long, Long>> changes = iteration
-				.join(edges).where(0).equalTo(0).with(new Join222())
-				.groupBy(0).aggregate(Aggregations.MIN, 1)
-				.join(iteration).where(0).equalTo(0)
-				.flatMap(new FlatMapJoin());
-		
-		// close the bulk iteration
-		return iteration.closeWith(changes);
-	}
-				
-		
-	public static DataSet<Tuple2<Long, Long>> doDeltaIteration(DataSet<Tuple2<Long, Long>> vertices, DataSet<Tuple2<Long, Long>> edges) {
-
-		DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> depIteration = vertices.iterateDelta(vertices, 100, 0);
-				
-		DataSet<Tuple1<Long>> candidates = depIteration.getWorkset().join(edges).where(0).equalTo(0)
-				.projectSecond(1).types(Long.class);
-		
-		DataSet<Tuple1<Long>> grouped = candidates.groupBy(0).reduceGroup(new Reduce101());
-		
-		DataSet<Tuple2<Long, Long>> candidatesDependencies = 
-				grouped.join(edges).where(0).equalTo(1).projectSecond(0, 1).types(Long.class, Long.class);
-		
-		DataSet<Tuple2<Long, Long>> verticesWithNewComponents = 
-				candidatesDependencies.join(depIteration.getSolutionSet()).where(0).equalTo(0)
-				.with(new Join222())
-				.groupBy(0).aggregate(Aggregations.MIN, 1);
-		
-		DataSet<Tuple2<Long, Long>> updatedComponentId = 
-				verticesWithNewComponents.join(depIteration.getSolutionSet()).where(0).equalTo(0)
-				.flatMap(new FlatMapJoin());
-		
-		DataSet<Tuple2<Long, Long>> depResult = depIteration.closeWith(updatedComponentId, updatedComponentId);
-		
-		return depResult;
-		
-	}
-	
-	public static final class Join222 extends JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
-
-		@Override
-		public Tuple2<Long, Long> join(Tuple2<Long, Long> vertexWithComponent, Tuple2<Long, Long> edge) {
-			return null;
-		}
-	}
-	
-	public static final class FlatMapJoin extends FlatMapFunction<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>, Tuple2<Long, Long>> {
-		
-		@Override
-		public void flatMap(Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> value, Collector<Tuple2<Long, Long>> out) {}
-	}
-	
-	public static final class DummyMap extends MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
-
-		@Override
-		public Tuple2<Long, Long> map(Tuple2<Long, Long> value) throws Exception {
-			return value;
-		}
-	}
-	
-	@ConstantFields("0")
-	public static final class Reduce101 extends GroupReduceFunction<Tuple1<Long>, Tuple1<Long>> {
-		
-		@Override
-		public void reduce(Iterator<Tuple1<Long>> values, Collector<Tuple1<Long>> out) {}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/convergence/WorksetEmptyConvergenceCriterion.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/convergence/WorksetEmptyConvergenceCriterion.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/convergence/WorksetEmptyConvergenceCriterion.java
index bb8af9c..c9fd442 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/convergence/WorksetEmptyConvergenceCriterion.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/convergence/WorksetEmptyConvergenceCriterion.java
@@ -24,6 +24,8 @@ import eu.stratosphere.types.LongValue;
  */
 public class WorksetEmptyConvergenceCriterion implements ConvergenceCriterion<LongValue> {
 
+	private static final long serialVersionUID = 1L;
+
 	private static final Log log = LogFactory.getLog(WorksetEmptyConvergenceCriterion.class);
 	
 	public static final String AGGREGATOR_NAME = "pact.runtime.workset-empty-aggregator";

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/OutputEmitter.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/OutputEmitter.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/OutputEmitter.java
index 6491749..638dd1d 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/OutputEmitter.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/OutputEmitter.java
@@ -78,7 +78,6 @@ public class OutputEmitter<T> implements ChannelSelector<SerializationDelegate<T
 		switch (strategy) {
 		case FORWARD:
 		case PARTITION_HASH:
-		case PARTITION_LOCAL_HASH:
 		case PARTITION_RANGE:
 		case PARTITION_RANDOM:
 		case BROADCAST:
@@ -103,7 +102,6 @@ public class OutputEmitter<T> implements ChannelSelector<SerializationDelegate<T
 		case PARTITION_RANDOM:
 			return robin(numberOfChannels);
 		case PARTITION_HASH:
-		case PARTITION_LOCAL_HASH:
 			return hashPartitionDefault(record.getInstance(), numberOfChannels);
 		case PARTITION_RANGE:
 			return rangePartition(record.getInstance(), numberOfChannels);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/RecordOutputEmitter.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/RecordOutputEmitter.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/RecordOutputEmitter.java
index 7fe35b4..efdb267 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/RecordOutputEmitter.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/RecordOutputEmitter.java
@@ -83,7 +83,6 @@ public class RecordOutputEmitter implements ChannelSelector<Record> {
 		switch (strategy) {
 		case FORWARD:
 		case PARTITION_HASH:
-		case PARTITION_LOCAL_HASH:
 		case PARTITION_RANGE:
 		case PARTITION_RANDOM:
 			this.channels = new int[1];
@@ -110,7 +109,6 @@ public class RecordOutputEmitter implements ChannelSelector<Record> {
 		case PARTITION_RANDOM:
 			return robin(numberOfChannels);
 		case PARTITION_HASH:
-		case PARTITION_LOCAL_HASH:
 			return hashPartitionDefault(record, numberOfChannels);
 		case PARTITION_RANGE:
 			return rangePartition(record, numberOfChannels);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/ShipStrategyType.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/ShipStrategyType.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/ShipStrategyType.java
index 2b50779..a86c209 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/ShipStrategyType.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/ShipStrategyType.java
@@ -40,12 +40,6 @@ public enum ShipStrategyType {
 	PARTITION_HASH(true, true, true),
 	
 	/**
-	 * Repartitioning the data within a local instance with a hash function. Happens for example when the
-	 * intra-node degree-of-parallelism is increased. 
-	 */
-	PARTITION_LOCAL_HASH(false, true, true),
-	
-	/**
 	 * Partitioning the data in ranges according to a total order.
 	 */
 	PARTITION_RANGE(true, true, true),

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-tests/src/test/java/eu/stratosphere/test/util/testjar/KMeansForTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/util/testjar/KMeansForTest.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/util/testjar/KMeansForTest.java
index c13ddc8..d1b249a 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/util/testjar/KMeansForTest.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/util/testjar/KMeansForTest.java
@@ -45,7 +45,7 @@ public class KMeansForTest implements Program{
 		}
 	
 		// set up execution environment
-		ExecutionEnvironment env = new RemoteEnvironment("localhost", 1, null);
+		ExecutionEnvironment env = new RemoteEnvironment("localhost", 1);
 		
 		// get input data
 		DataSet<Point> points = getPointDataSet(env);