You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/20 11:06:48 UTC

[09/53] [abbrv] flink git commit: [optimizer] Rename optimizer project to "flink-optimizer" (previously flink-compiler)

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/InterestingPropertyVisitor.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/InterestingPropertyVisitor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/InterestingPropertyVisitor.java
new file mode 100644
index 0000000..c2e81a8
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/InterestingPropertyVisitor.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.traversals;
+
+import org.apache.flink.optimizer.costs.CostEstimator;
+import org.apache.flink.optimizer.dag.OptimizerNode;
+import org.apache.flink.util.Visitor;
+
+/**
+ * Visitor that computes the interesting properties for each node in the optimizer DAG. On its recursive
+ * depth-first descend, it propagates all interesting properties top-down.
+ */
+public class InterestingPropertyVisitor implements Visitor<OptimizerNode> {
+
+	private CostEstimator estimator; // the cost estimator for maximal costs of an interesting property
+
+	/**
+	 * Creates a new visitor that computes the interesting properties for all nodes in the plan.
+	 * It uses the given cost estimator used to compute the maximal costs for an interesting property.
+	 *
+	 * @param estimator
+	 *        The cost estimator to estimate the maximal costs for interesting properties.
+	 */
+	public InterestingPropertyVisitor(CostEstimator estimator) {
+		this.estimator = estimator;
+	}
+
+	@Override
+	public boolean preVisit(OptimizerNode node) {
+		// The interesting properties must be computed on the descend. In case a node has multiple outputs,
+		// that computation must happen during the last descend.
+
+		if (node.getInterestingProperties() == null && node.haveAllOutputConnectionInterestingProperties()) {
+			node.computeUnionOfInterestingPropertiesFromSuccessors();
+			node.computeInterestingPropertiesForInputs(this.estimator);
+			return true;
+		} else {
+			return false;
+		}
+	}
+
+	@Override
+	public void postVisit(OptimizerNode visitable) {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/PlanFinalizer.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/PlanFinalizer.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/PlanFinalizer.java
new file mode 100644
index 0000000..58aa3c1
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/PlanFinalizer.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.traversals;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.dag.TempMode;
+import org.apache.flink.optimizer.plan.BinaryUnionPlanNode;
+import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
+import org.apache.flink.optimizer.plan.BulkPartialSolutionPlanNode;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.IterationPlanNode;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.PlanNode;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.optimizer.plan.SolutionSetPlanNode;
+import org.apache.flink.optimizer.plan.SourcePlanNode;
+import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
+import org.apache.flink.optimizer.plan.WorksetPlanNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.util.Visitor;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This visitor traverses the selected execution plan and finalizes it:
+ *
+ * <ul>
+ *     <li>The graph of nodes is double-linked (links from child to parent are inserted).</li>
+ *     <li>If unions join static and dynamic paths, the cache is marked as a memory consumer.</li>
+ *     <li>Relative memory fractions are assigned to all nodes.</li>
+ *     <li>All nodes are collected into a set.</li>
+ * </ul>
+ */
+public class PlanFinalizer implements Visitor<PlanNode> {
+
+	private final Set<PlanNode> allNodes; // a set of all nodes in the optimizer plan
+
+	private final List<SourcePlanNode> sources; // all data source nodes in the optimizer plan
+
+	private final List<SinkPlanNode> sinks; // all data sink nodes in the optimizer plan
+
+	private final Deque<IterationPlanNode> stackOfIterationNodes;
+
+	private int memoryConsumerWeights; // a counter of all memory consumers
+
+	/**
+	 * Creates a new plan finalizer.
+	 */
+	public PlanFinalizer() {
+		this.allNodes = new HashSet<PlanNode>();
+		this.sources = new ArrayList<SourcePlanNode>();
+		this.sinks = new ArrayList<SinkPlanNode>();
+		this.stackOfIterationNodes = new ArrayDeque<IterationPlanNode>();
+	}
+
+	public OptimizedPlan createFinalPlan(List<SinkPlanNode> sinks, String jobName, Plan originalPlan) {
+		this.memoryConsumerWeights = 0;
+
+		// traverse the graph
+		for (SinkPlanNode node : sinks) {
+			node.accept(this);
+		}
+
+		// assign the memory to each node
+		if (this.memoryConsumerWeights > 0) {
+			for (PlanNode node : this.allNodes) {
+				// assign memory to the driver strategy of the node
+				final int consumerWeight = node.getMemoryConsumerWeight();
+				if (consumerWeight > 0) {
+					final double relativeMem = (double)consumerWeight / this.memoryConsumerWeights;
+					node.setRelativeMemoryPerSubtask(relativeMem);
+					if (Optimizer.LOG.isDebugEnabled()) {
+						Optimizer.LOG.debug("Assigned " + relativeMem + " of total memory to each subtask of " +
+							node.getProgramOperator().getName() + ".");
+					}
+				}
+
+				// assign memory to the local and global strategies of the channels
+				for (Channel c : node.getInputs()) {
+					if (c.getLocalStrategy().dams()) {
+						final double relativeMem = 1.0 / this.memoryConsumerWeights;
+						c.setRelativeMemoryLocalStrategy(relativeMem);
+						if (Optimizer.LOG.isDebugEnabled()) {
+							Optimizer.LOG.debug("Assigned " + relativeMem + " of total memory to each local strategy " +
+									"instance of " + c + ".");
+						}
+					}
+					if (c.getTempMode() != TempMode.NONE) {
+						final double relativeMem = 1.0/ this.memoryConsumerWeights;
+						c.setRelativeTempMemory(relativeMem);
+						if (Optimizer.LOG.isDebugEnabled()) {
+							Optimizer.LOG.debug("Assigned " + relativeMem + " of total memory to each instance of the temp " +
+									"table for " + c + ".");
+						}
+					}
+				}
+			}
+		}
+		return new OptimizedPlan(this.sources, this.sinks, this.allNodes, jobName, originalPlan);
+	}
+
+	@Override
+	public boolean preVisit(PlanNode visitable) {
+		// if we come here again, prevent a further descend
+		if (!this.allNodes.add(visitable)) {
+			return false;
+		}
+
+		if (visitable instanceof SinkPlanNode) {
+			this.sinks.add((SinkPlanNode) visitable);
+		}
+		else if (visitable instanceof SourcePlanNode) {
+			this.sources.add((SourcePlanNode) visitable);
+		}
+		else if (visitable instanceof BinaryUnionPlanNode) {
+			BinaryUnionPlanNode unionNode = (BinaryUnionPlanNode) visitable;
+			if (unionNode.unionsStaticAndDynamicPath()) {
+				unionNode.setDriverStrategy(DriverStrategy.UNION_WITH_CACHED);
+			}
+		}
+		else if (visitable instanceof BulkPartialSolutionPlanNode) {
+			// tell the partial solution about the iteration node that contains it
+			final BulkPartialSolutionPlanNode pspn = (BulkPartialSolutionPlanNode) visitable;
+			final IterationPlanNode iteration = this.stackOfIterationNodes.peekLast();
+
+			// sanity check!
+			if (iteration == null || !(iteration instanceof BulkIterationPlanNode)) {
+				throw new CompilerException("Bug: Error finalizing the plan. " +
+						"Cannot associate the node for a partial solutions with its containing iteration.");
+			}
+			pspn.setContainingIterationNode((BulkIterationPlanNode) iteration);
+		}
+		else if (visitable instanceof WorksetPlanNode) {
+			// tell the partial solution about the iteration node that contains it
+			final WorksetPlanNode wspn = (WorksetPlanNode) visitable;
+			final IterationPlanNode iteration = this.stackOfIterationNodes.peekLast();
+
+			// sanity check!
+			if (iteration == null || !(iteration instanceof WorksetIterationPlanNode)) {
+				throw new CompilerException("Bug: Error finalizing the plan. " +
+						"Cannot associate the node for a partial solutions with its containing iteration.");
+			}
+			wspn.setContainingIterationNode((WorksetIterationPlanNode) iteration);
+		}
+		else if (visitable instanceof SolutionSetPlanNode) {
+			// tell the partial solution about the iteration node that contains it
+			final SolutionSetPlanNode sspn = (SolutionSetPlanNode) visitable;
+			final IterationPlanNode iteration = this.stackOfIterationNodes.peekLast();
+
+			// sanity check!
+			if (iteration == null || !(iteration instanceof WorksetIterationPlanNode)) {
+				throw new CompilerException("Bug: Error finalizing the plan. " +
+						"Cannot associate the node for a partial solutions with its containing iteration.");
+			}
+			sspn.setContainingIterationNode((WorksetIterationPlanNode) iteration);
+		}
+
+		// double-connect the connections. previously, only parents knew their children, because
+		// one child candidate could have been referenced by multiple parents.
+		for (Channel conn : visitable.getInputs()) {
+			conn.setTarget(visitable);
+			conn.getSource().addOutgoingChannel(conn);
+		}
+
+		for (Channel c : visitable.getBroadcastInputs()) {
+			c.setTarget(visitable);
+			c.getSource().addOutgoingChannel(c);
+		}
+
+		// count the memory consumption
+		this.memoryConsumerWeights += visitable.getMemoryConsumerWeight();
+		for (Channel c : visitable.getInputs()) {
+			if (c.getLocalStrategy().dams()) {
+				this.memoryConsumerWeights++;
+			}
+			if (c.getTempMode() != TempMode.NONE) {
+				this.memoryConsumerWeights++;
+			}
+		}
+		for (Channel c : visitable.getBroadcastInputs()) {
+			if (c.getLocalStrategy().dams()) {
+				this.memoryConsumerWeights++;
+			}
+			if (c.getTempMode() != TempMode.NONE) {
+				this.memoryConsumerWeights++;
+			}
+		}
+
+		// pass the visitor to the iteraton's step function
+		if (visitable instanceof IterationPlanNode) {
+			// push the iteration node onto the stack
+			final IterationPlanNode iterNode = (IterationPlanNode) visitable;
+			this.stackOfIterationNodes.addLast(iterNode);
+
+			// recurse
+			((IterationPlanNode) visitable).acceptForStepFunction(this);
+
+			// pop the iteration node from the stack
+			this.stackOfIterationNodes.removeLast();
+		}
+		return true;
+	}
+
+	@Override
+	public void postVisit(PlanNode visitable) {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/StaticDynamicPathIdentifier.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/StaticDynamicPathIdentifier.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/StaticDynamicPathIdentifier.java
new file mode 100644
index 0000000..c0dc4dd
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/StaticDynamicPathIdentifier.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.traversals;
+
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.dag.IterationNode;
+import org.apache.flink.optimizer.dag.OptimizerNode;
+import org.apache.flink.util.Visitor;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * A traversal that goes over the program data flow of an iteration and makes the nodes
+ * that depend on the partial solution (the data set recomputed in each iteration) as "dynamic"
+ * and the other nodes as "static".
+ */
+public class StaticDynamicPathIdentifier implements Visitor<OptimizerNode> {
+
+	private final Set<OptimizerNode> seenBefore = new HashSet<OptimizerNode>();
+
+	private final int costWeight;
+
+	public StaticDynamicPathIdentifier(int costWeight) {
+		this.costWeight = costWeight;
+	}
+
+	@Override
+	public boolean preVisit(OptimizerNode visitable) {
+		return this.seenBefore.add(visitable);
+	}
+
+	@Override
+	public void postVisit(OptimizerNode visitable) {
+		visitable.identifyDynamicPath(this.costWeight);
+
+		// check that there is no nested iteration on the dynamic path
+		if (visitable.isOnDynamicPath() && visitable instanceof IterationNode) {
+			throw new CompilerException("Nested iterations are currently not supported.");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/StepFunctionValidator.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/StepFunctionValidator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/StepFunctionValidator.java
new file mode 100644
index 0000000..d359490
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/StepFunctionValidator.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.traversals;
+
+import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.common.operators.base.DeltaIterationBase;
+import org.apache.flink.util.Visitor;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * A traversal that checks if the Workset of a delta iteration is used in the data flow
+ * of its step function.
+ */
+public class StepFunctionValidator implements Visitor<Operator<?>> {
+
+	private final Set<Operator<?>> seenBefore = new HashSet<Operator<?>>();
+
+	private boolean foundWorkset;
+
+	@Override
+	public boolean preVisit(Operator<?> visitable) {
+		if (visitable instanceof DeltaIterationBase.WorksetPlaceHolder) {
+			foundWorkset = true;
+		}
+
+		return (!foundWorkset) && seenBefore.add(visitable);
+	}
+
+	@Override
+	public void postVisit(Operator<?> visitable) {}
+
+	public boolean hasFoundWorkset() {
+		return foundWorkset;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/package-info.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/package-info.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/package-info.java
new file mode 100644
index 0000000..cd8766c
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/package-info.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains the various traversals over the program plan and the
+ * optimizer DAG (directed acyclic graph) that are made in the course of
+ * the optimization.
+ *
+ * The traversals are mostly implemented as a {@link org.apache.flink.util.Visitor} that
+ * traversed the program flow.
+ */
+package org.apache.flink.optimizer.traversals;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/util/NoOpBinaryUdfOp.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/util/NoOpBinaryUdfOp.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/util/NoOpBinaryUdfOp.java
new file mode 100644
index 0000000..5110849
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/util/NoOpBinaryUdfOp.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.util;
+
+import java.util.List;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.NoOpFunction;
+import org.apache.flink.api.common.operators.BinaryOperatorInformation;
+import org.apache.flink.api.common.operators.DualInputOperator;
+import org.apache.flink.api.common.operators.RecordOperator;
+import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.types.Key;
+
+
+public class NoOpBinaryUdfOp<OUT> extends DualInputOperator<OUT, OUT, OUT, NoOpFunction> implements RecordOperator {
+
+	public NoOpBinaryUdfOp(TypeInformation<OUT> type) {
+		super(new UserCodeClassWrapper<NoOpFunction>(NoOpFunction.class), new BinaryOperatorInformation<OUT, OUT, OUT>(type, type, type), "NoContract");
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public Class<? extends Key<?>>[] getKeyClasses() {
+		return (Class<? extends Key<?>>[]) new Class[0];
+	}
+
+	@Override
+	protected List<OUT> executeOnCollections(List<OUT> inputData1, List<OUT> inputData2, RuntimeContext runtimeContext, ExecutionConfig executionConfig) {
+		throw new UnsupportedOperationException();
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/util/NoOpUnaryUdfOp.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/util/NoOpUnaryUdfOp.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/util/NoOpUnaryUdfOp.java
new file mode 100644
index 0000000..cc4a4d6
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/util/NoOpUnaryUdfOp.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.util;
+
+import java.util.List;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.NoOpFunction;
+import org.apache.flink.api.common.operators.RecordOperator;
+import org.apache.flink.api.common.operators.SingleInputOperator;
+import org.apache.flink.api.common.operators.UnaryOperatorInformation;
+import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.types.Key;
+
+
+public class NoOpUnaryUdfOp<OUT> extends SingleInputOperator<OUT, OUT, NoOpFunction> implements RecordOperator {
+	
+	@SuppressWarnings("rawtypes")
+	public static final NoOpUnaryUdfOp INSTANCE = new NoOpUnaryUdfOp();
+	
+	private NoOpUnaryUdfOp() {
+		// pass null here because we override getOutputType to return type
+		// of input operator
+		super(new UserCodeClassWrapper<NoOpFunction>(NoOpFunction.class), null, "");
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public Class<? extends Key<?>>[] getKeyClasses() {
+		return (Class<? extends Key<?>>[]) new Class[0];
+	}
+
+	@Override
+	public UnaryOperatorInformation<OUT, OUT> getOperatorInfo() {
+		TypeInformation<OUT> previousOut = input.getOperatorInfo().getOutputType();
+		return new UnaryOperatorInformation<OUT, OUT>(previousOut, previousOut);
+	}
+
+	@Override
+	protected List<OUT> executeOnCollections(List<OUT> inputData, RuntimeContext runtimeContext, ExecutionConfig executionConfig) {
+		return inputData;
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/util/Utils.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/util/Utils.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/util/Utils.java
new file mode 100644
index 0000000..d8f33a2
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/util/Utils.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.util;
+
+import java.util.Arrays;
+
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.optimizer.CompilerException;
+
+
+/**
+ * 
+ */
+public class Utils
+{
+	public static final FieldList createOrderedFromSet(FieldSet set) {
+		if (set instanceof FieldList) {
+			return (FieldList) set;
+		} else {
+			final int[] cols = set.toArray();
+			Arrays.sort(cols);
+			return new FieldList(cols);
+		}
+	}
+	
+	public static final Ordering createOrdering(FieldList fields, boolean[] directions) {
+		final Ordering o = new Ordering();
+		for (int i = 0; i < fields.size(); i++) {
+			o.appendOrdering(fields.get(i), null, directions == null || directions[i] ? Order.ASCENDING : Order.DESCENDING);
+		}
+		return o;
+	}
+	
+	public static final Ordering createOrdering(FieldList fields) {
+		final Ordering o = new Ordering();
+		for (int i = 0; i < fields.size(); i++) {
+			o.appendOrdering(fields.get(i), null, Order.ANY);
+		}
+		return o;
+	}
+	
+	public static boolean[] getDirections(Ordering o, int numFields) {
+		final boolean[] dirs = o.getFieldSortDirections();
+		if (dirs.length == numFields) {
+			return dirs;
+		} else if (dirs.length > numFields) {
+			final boolean[] subSet = new boolean[numFields];
+			System.arraycopy(dirs, 0, subSet, 0, numFields);
+			return subSet;
+		} else {
+			throw new CompilerException();
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	/**
+	 * No instantiation.
+	 */
+	private Utils() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/AdditionalOperatorsTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/AdditionalOperatorsTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/AdditionalOperatorsTest.java
new file mode 100644
index 0000000..1e4bafb
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/AdditionalOperatorsTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.record.operators.CrossOperator;
+import org.apache.flink.api.java.record.operators.CrossWithLargeOperator;
+import org.apache.flink.api.java.record.operators.CrossWithSmallOperator;
+import org.apache.flink.api.java.record.operators.FileDataSink;
+import org.apache.flink.api.java.record.operators.FileDataSource;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.DualInputPlanNode;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.util.DummyCrossStub;
+import org.apache.flink.optimizer.util.DummyInputFormat;
+import org.apache.flink.optimizer.util.DummyOutputFormat;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.junit.Test;
+
+/**
+* Tests that validate optimizer choices when using operators that are requesting certain specific execution
+* strategies.
+*/
+@SuppressWarnings({"serial", "deprecation"})
+public class AdditionalOperatorsTest extends CompilerTestBase {
+
+	@Test
+	public void testCrossWithSmall() {
+		// construct the plan
+		FileDataSource source1 = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source 1");
+		FileDataSource source2 = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source 2");
+		
+		CrossOperator cross = CrossWithSmallOperator.builder(new DummyCrossStub())
+				.input1(source1).input2(source2)
+				.name("Cross").build();
+	
+		FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, cross, "Sink");
+		
+		Plan plan = new Plan(sink);
+		plan.setDefaultParallelism(DEFAULT_PARALLELISM);
+		
+		
+		try {
+			OptimizedPlan oPlan = compileNoStats(plan);
+			OptimizerPlanNodeResolver resolver = new OptimizerPlanNodeResolver(oPlan);
+			
+			DualInputPlanNode crossPlanNode = resolver.getNode("Cross");
+			Channel in1 = crossPlanNode.getInput1();
+			Channel in2 = crossPlanNode.getInput2();
+			
+			assertEquals(ShipStrategyType.FORWARD, in1.getShipStrategy());
+			assertEquals(ShipStrategyType.BROADCAST, in2.getShipStrategy());
+		} catch(CompilerException ce) {
+			ce.printStackTrace();
+			fail("The pact compiler is unable to compile this plan correctly.");
+		}
+	}
+	
+	@Test
+	public void testCrossWithLarge() {
+		// construct the plan
+		FileDataSource source1 = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source 1");
+		FileDataSource source2 = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source 2");
+		
+		CrossOperator cross= CrossWithLargeOperator.builder(new DummyCrossStub())
+				.input1(source1).input2(source2)
+				.name("Cross").build();
+	
+		FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, cross, "Sink");
+		
+		Plan plan = new Plan(sink);
+		plan.setDefaultParallelism(DEFAULT_PARALLELISM);
+		
+		
+		try {
+			OptimizedPlan oPlan = compileNoStats(plan);
+			OptimizerPlanNodeResolver resolver = new OptimizerPlanNodeResolver(oPlan);
+			
+			DualInputPlanNode crossPlanNode = resolver.getNode("Cross");
+			Channel in1 = crossPlanNode.getInput1();
+			Channel in2 = crossPlanNode.getInput2();
+			
+			assertEquals(ShipStrategyType.BROADCAST, in1.getShipStrategy());
+			assertEquals(ShipStrategyType.FORWARD, in2.getShipStrategy());
+		} catch(CompilerException ce) {
+			ce.printStackTrace();
+			fail("The pact compiler is unable to compile this plan correctly.");
+		}
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
new file mode 100644
index 0000000..916aa27
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
@@ -0,0 +1,1039 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.optimizer.testfunctions.DummyCoGroupFunction;
+import org.apache.flink.optimizer.testfunctions.SelectOneReducer;
+import org.apache.flink.util.Collector;
+import org.junit.Assert;
+import org.junit.Test;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.IterativeDataSet;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichJoinFunction;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
+import org.apache.flink.api.java.record.operators.BulkIteration;
+import org.apache.flink.api.java.record.operators.CoGroupOperator;
+import org.apache.flink.api.java.record.operators.CrossOperator;
+import org.apache.flink.api.java.record.operators.DeltaIteration;
+import org.apache.flink.api.java.record.operators.FileDataSink;
+import org.apache.flink.api.java.record.operators.FileDataSource;
+import org.apache.flink.api.java.record.operators.JoinOperator;
+import org.apache.flink.api.java.record.operators.MapOperator;
+import org.apache.flink.api.java.record.operators.ReduceOperator;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.optimizer.testfunctions.DummyFlatJoinFunction;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
+import org.apache.flink.optimizer.testfunctions.IdentityKeyExtractor;
+import org.apache.flink.optimizer.testfunctions.IdentityMapper;
+import org.apache.flink.optimizer.testfunctions.Top1GroupReducer;
+import org.apache.flink.optimizer.util.DummyCoGroupStub;
+import org.apache.flink.optimizer.util.DummyCrossStub;
+import org.apache.flink.optimizer.util.DummyInputFormat;
+import org.apache.flink.optimizer.util.DummyMatchStub;
+import org.apache.flink.optimizer.util.DummyNonPreservingMatchStub;
+import org.apache.flink.optimizer.util.DummyOutputFormat;
+import org.apache.flink.optimizer.util.IdentityMap;
+import org.apache.flink.optimizer.util.IdentityReduce;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+
+@SuppressWarnings({"serial", "deprecation"})
+public class BranchingPlansCompilerTest extends CompilerTestBase {
+	
+	
+	@Test
+	public void testCostComputationWithMultipleDataSinks() {
+		final int SINKS = 5;
+	
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+			DataSet<Long> source = env.generateSequence(1, 10000);
+
+			DataSet<Long> mappedA = source.map(new IdentityMapper<Long>());
+			DataSet<Long> mappedC = source.map(new IdentityMapper<Long>());
+
+			for (int sink = 0; sink < SINKS; sink++) {
+				mappedA.output(new DiscardingOutputFormat<Long>());
+				mappedC.output(new DiscardingOutputFormat<Long>());
+			}
+
+			Plan plan = env.createProgramPlan("Plans With Multiple Data Sinks");
+			OptimizedPlan oPlan = compileNoStats(plan);
+
+			new JobGraphGenerator().compileJobGraph(oPlan);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+
+	/**
+	 * 
+	 * <pre>
+	 *                (SRC A)  
+	 *                   |
+	 *                (MAP A)
+	 *             /         \   
+	 *          (MAP B)      (MAP C)
+	 *           /           /     \
+	 *        (SINK A)    (SINK B)  (SINK C)
+	 * </pre>
+	 */
+	@SuppressWarnings("unchecked")
+	@Test
+	public void testBranchingWithMultipleDataSinks2() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+			DataSet<Long> source = env.generateSequence(1, 10000);
+
+			DataSet<Long> mappedA = source.map(new IdentityMapper<Long>());
+			DataSet<Long> mappedB = mappedA.map(new IdentityMapper<Long>());
+			DataSet<Long> mappedC = mappedA.map(new IdentityMapper<Long>());
+
+			mappedB.output(new DiscardingOutputFormat<Long>());
+			mappedC.output(new DiscardingOutputFormat<Long>());
+			mappedC.output(new DiscardingOutputFormat<Long>());
+
+			Plan plan = env.createProgramPlan();
+			Set<Operator<?>> sinks = new HashSet<Operator<?>>(plan.getDataSinks());
+
+			OptimizedPlan oPlan = compileNoStats(plan);
+
+			// ---------- check the optimizer plan ----------
+
+			// number of sinks
+			assertEquals("Wrong number of data sinks.", 3, oPlan.getDataSinks().size());
+
+			// remove matching sinks to check relation
+			for (SinkPlanNode sink : oPlan.getDataSinks()) {
+				assertTrue(sinks.remove(sink.getProgramOperator()));
+			}
+			assertTrue(sinks.isEmpty());
+
+			new JobGraphGenerator().compileJobGraph(oPlan);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+
+	/**
+	 * <pre>
+	 *                              SINK
+	 *                               |
+	 *                            COGROUP
+	 *                        +---/    \----+
+	 *                       /               \
+	 *                      /             MATCH10
+	 *                     /               |    \
+	 *                    /                |  MATCH9
+	 *                MATCH5               |  |   \
+	 *                |   \                |  | MATCH8
+	 *                | MATCH4             |  |  |   \
+	 *                |  |   \             |  |  | MATCH7
+	 *                |  | MATCH3          |  |  |  |   \
+	 *                |  |  |   \          |  |  |  | MATCH6
+	 *                |  |  | MATCH2       |  |  |  |  |  |
+	 *                |  |  |  |   \       +--+--+--+--+--+
+	 *                |  |  |  | MATCH1            MAP 
+	 *                \  |  |  |  |  | /-----------/
+	 *                (DATA SOURCE ONE)
+	 * </pre>
+	 */
+	@Test
+	public void testBranchingSourceMultipleTimes() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+			DataSet<Tuple2<Long, Long>> source = env.generateSequence(1, 10000000)
+				.map(new Duplicator<Long>());
+
+			DataSet<Tuple2<Long, Long>> joined1 = source.join(source).where(0).equalTo(0)
+														.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
+
+			DataSet<Tuple2<Long, Long>> joined2 = source.join(joined1).where(0).equalTo(0)
+					.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
+
+			DataSet<Tuple2<Long, Long>> joined3 = source.join(joined2).where(0).equalTo(0)
+					.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
+
+			DataSet<Tuple2<Long, Long>> joined4 = source.join(joined3).where(0).equalTo(0)
+					.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
+
+			DataSet<Tuple2<Long, Long>> joined5 = source.join(joined4).where(0).equalTo(0)
+					.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
+
+			DataSet<Tuple2<Long, Long>> mapped = source.map(
+					new MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() {
+						@Override
+						public Tuple2<Long, Long> map(Tuple2<Long, Long> value) {
+							return null;
+						}
+			});
+
+			DataSet<Tuple2<Long, Long>> joined6 = mapped.join(mapped).where(0).equalTo(0)
+					.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
+
+			DataSet<Tuple2<Long, Long>> joined7 = mapped.join(joined6).where(0).equalTo(0)
+					.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
+
+			DataSet<Tuple2<Long, Long>> joined8 = mapped.join(joined7).where(0).equalTo(0)
+					.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
+
+			DataSet<Tuple2<Long, Long>> joined9 = mapped.join(joined8).where(0).equalTo(0)
+					.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
+
+			DataSet<Tuple2<Long, Long>> joined10 = mapped.join(joined9).where(0).equalTo(0)
+					.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
+
+
+			joined5.coGroup(joined10)
+					.where(1).equalTo(1)
+					.with(new DummyCoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>())
+
+				.output(new DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>>());
+
+			Plan plan = env.createProgramPlan();
+			OptimizedPlan oPlan = compileNoStats(plan);
+			new JobGraphGenerator().compileJobGraph(oPlan);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	/**
+	 * 
+	 * <pre>
+
+	 *              (SINK A)
+	 *                  |    (SINK B)    (SINK C)
+	 *                CROSS    /          /
+	 *               /     \   |  +------+
+	 *              /       \  | /
+	 *          REDUCE      MATCH2
+	 *             |    +---/    \
+	 *              \  /          |
+	 *               MAP          |
+	 *                |           |
+	 *             COGROUP      MATCH1
+	 *             /     \     /     \
+	 *        (SRC A)    (SRC B)    (SRC C)
+	 * </pre>
+	 */
+	@Test
+	public void testBranchingWithMultipleDataSinks() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+			DataSet<Tuple2<Long, Long>> sourceA = env.generateSequence(1, 10000000)
+					.map(new Duplicator<Long>());
+
+			DataSet<Tuple2<Long, Long>> sourceB = env.generateSequence(1, 10000000)
+					.map(new Duplicator<Long>());
+
+			DataSet<Tuple2<Long, Long>> sourceC = env.generateSequence(1, 10000000)
+					.map(new Duplicator<Long>());
+
+			DataSet<Tuple2<Long, Long>> mapped = sourceA.coGroup(sourceB)
+					.where(0).equalTo(1)
+					.with(new CoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>() {
+							@Override
+							public void coGroup(Iterable<Tuple2<Long, Long>> first,
+													Iterable<Tuple2<Long, Long>> second,
+													Collector<Tuple2<Long, Long>> out) {
+							  }
+					})
+					.map(new IdentityMapper<Tuple2<Long, Long>>());
+
+			DataSet<Tuple2<Long, Long>> joined = sourceB.join(sourceC)
+					.where(0).equalTo(1)
+					.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
+
+			DataSet<Tuple2<Long, Long>> joined2 = mapped.join(joined)
+					.where(1).equalTo(1)
+					.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
+
+			DataSet<Tuple2<Long, Long>> reduced = mapped
+					.groupBy(1)
+					.reduceGroup(new Top1GroupReducer<Tuple2<Long, Long>>());
+
+			reduced.cross(joined2)
+					.output(new DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>>());
+
+			joined2.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
+			joined2.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
+
+			Plan plan = env.createProgramPlan();
+			OptimizedPlan oPlan = compileNoStats(plan);
+			new JobGraphGenerator().compileJobGraph(oPlan);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@SuppressWarnings("unchecked")
+	@Test
+	public void testBranchEachContractType() {
+		try {
+			// construct the plan
+			FileDataSource sourceA = new FileDataSource(new DummyInputFormat(), "file:///test/file1", "Source A");
+			FileDataSource sourceB = new FileDataSource(new DummyInputFormat(), "file:///test/file2", "Source B");
+			FileDataSource sourceC = new FileDataSource(new DummyInputFormat(), "file:///test/file3", "Source C");
+			
+			MapOperator map1 = MapOperator.builder(new IdentityMap()).input(sourceA).name("Map 1").build();
+			
+			ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0)
+				.input(map1)
+				.name("Reduce 1")
+				.build();
+			
+			JoinOperator match1 = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0)
+				.input1(sourceB, sourceB, sourceC)
+				.input2(sourceC)
+				.name("Match 1")
+				.build();
+			;
+			CoGroupOperator cogroup1 = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 0,0)
+				.input1(sourceA)
+				.input2(sourceB)
+				.name("CoGroup 1")
+				.build();
+			
+			CrossOperator cross1 = CrossOperator.builder(new DummyCrossStub())
+				.input1(reduce1)
+				.input2(cogroup1)
+				.name("Cross 1")
+				.build();
+			
+			
+			CoGroupOperator cogroup2 = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 0,0)
+				.input1(cross1)
+				.input2(cross1)
+				.name("CoGroup 2")
+				.build();
+			
+			CoGroupOperator cogroup3 = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 0,0)
+				.input1(map1)
+				.input2(match1)
+				.name("CoGroup 3")
+				.build();
+			
+			
+			MapOperator map2 = MapOperator.builder(new IdentityMap()).input(cogroup3).name("Map 2").build();
+			
+			CoGroupOperator cogroup4 = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 0,0)
+				.input1(map2)
+				.input2(match1)
+				.name("CoGroup 4")
+				.build();
+			
+			CoGroupOperator cogroup5 = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 0,0)
+				.input1(cogroup2)
+				.input2(cogroup1)
+				.name("CoGroup 5")
+				.build();
+			
+			CoGroupOperator cogroup6 = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 0,0)
+				.input1(reduce1)
+				.input2(cogroup4)
+				.name("CoGroup 6")
+				.build();
+			
+			CoGroupOperator cogroup7 = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 0,0)
+				.input1(cogroup5)
+				.input2(cogroup6)
+				.name("CoGroup 7")
+				.build();
+			
+			FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, cogroup7);
+			sink.addInput(sourceA);
+			sink.addInput(cogroup3);
+			sink.addInput(cogroup4);
+			sink.addInput(cogroup1);
+			
+			// return the PACT plan
+			Plan plan = new Plan(sink, "Branching of each contract type");
+			
+			OptimizedPlan oPlan = compileNoStats(plan);
+			
+			JobGraphGenerator jobGen = new JobGraphGenerator();
+			
+			//Compile plan to verify that no error is thrown
+			jobGen.compileJobGraph(oPlan);
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail(e.getMessage());
+		}
+	}
+	
+
+	@Test
+	public void testBranchingUnion() {
+		try {
+			// construct the plan
+			FileDataSource source1 = new FileDataSource(new DummyInputFormat(), IN_FILE);
+			FileDataSource source2 = new FileDataSource(new DummyInputFormat(), IN_FILE);
+			
+			JoinOperator mat1 = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0)
+				.input1(source1)
+				.input2(source2)
+				.name("Match 1")
+				.build();
+			
+			MapOperator ma1 = MapOperator.builder(new IdentityMap()).input(mat1).name("Map1").build();
+			
+			ReduceOperator r1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0)
+				.input(ma1)
+				.name("Reduce 1")
+				.build();
+			
+			ReduceOperator r2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0)
+				.input(mat1)
+				.name("Reduce 2")
+				.build();
+			
+			MapOperator ma2 = MapOperator.builder(new IdentityMap()).input(mat1).name("Map 2").build();
+			
+			MapOperator ma3 = MapOperator.builder(new IdentityMap()).input(ma2).name("Map 3").build();
+			
+			@SuppressWarnings("unchecked")
+			JoinOperator mat2 = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0)
+				.input1(r1, r2, ma2, ma3)
+				.input2(ma2)
+				.name("Match 2")
+				.build();
+			mat2.setParameter(Optimizer.HINT_LOCAL_STRATEGY, Optimizer.HINT_LOCAL_STRATEGY_MERGE);
+			
+			FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, mat2);
+			
+			
+			// return the PACT plan
+			Plan plan = new Plan(sink, "Branching Union");
+			
+			OptimizedPlan oPlan = compileNoStats(plan);
+			
+			JobGraphGenerator jobGen = new JobGraphGenerator();
+			
+			//Compile plan to verify that no error is thrown
+			jobGen.compileJobGraph(oPlan);
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail(e.getMessage());
+		}
+	}
+	
+	/**
+	 * 
+	 * <pre>
+	 *             (SRC A)     
+	 *             /     \      
+	 *        (SINK A)    (SINK B)
+	 * </pre>
+	 */
+	@Test
+	public void testBranchingWithMultipleDataSinksSmall() {
+		try {
+			// construct the plan
+			final String out1Path = "file:///test/1";
+			final String out2Path = "file:///test/2";
+	
+			FileDataSource sourceA = new FileDataSource(DummyInputFormat.class, IN_FILE);
+			
+			FileDataSink sinkA = new FileDataSink(DummyOutputFormat.class, out1Path, sourceA);
+			FileDataSink sinkB = new FileDataSink(DummyOutputFormat.class, out2Path, sourceA);
+			
+			List<FileDataSink> sinks = new ArrayList<FileDataSink>();
+			sinks.add(sinkA);
+			sinks.add(sinkB);
+			
+			// return the PACT plan
+			Plan plan = new Plan(sinks, "Plans With Multiple Data Sinks");
+			
+			OptimizedPlan oPlan = compileNoStats(plan);
+			
+			// ---------- check the optimizer plan ----------
+			
+			// number of sinks
+			Assert.assertEquals("Wrong number of data sinks.", 2, oPlan.getDataSinks().size());
+			
+			// sinks contain all sink paths
+			Set<String> allSinks = new HashSet<String>();
+			allSinks.add(out1Path);
+			allSinks.add(out2Path);
+			
+			for (SinkPlanNode n : oPlan.getDataSinks()) {
+				String path = ((FileDataSink) n.getSinkNode().getOperator()).getFilePath();
+				Assert.assertTrue("Invalid data sink.", allSinks.remove(path));
+			}
+			
+			// ---------- compile plan to nephele job graph to verify that no error is thrown ----------
+			
+			JobGraphGenerator jobGen = new JobGraphGenerator();
+			jobGen.compileJobGraph(oPlan);
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail(e.getMessage());
+		}
+	}
+	
+	/**
+	 * 
+	 * <pre>
+	 *     (SINK 3) (SINK 1)   (SINK 2) (SINK 4)
+	 *         \     /             \     /
+	 *         (SRC A)             (SRC B)
+	 * </pre>
+	 * 
+	 * NOTE: this case is currently not caught by the compiler. we should enable the test once it is caught.
+	 */
+	@Test
+	public void testBranchingDisjointPlan() {
+		// construct the plan
+		final String out1Path = "file:///test/1";
+		final String out2Path = "file:///test/2";
+		final String out3Path = "file:///test/3";
+		final String out4Path = "file:///test/4";
+
+		FileDataSource sourceA = new FileDataSource(DummyInputFormat.class, IN_FILE);
+		FileDataSource sourceB = new FileDataSource(DummyInputFormat.class, IN_FILE);
+		
+		FileDataSink sink1 = new FileDataSink(DummyOutputFormat.class, out1Path, sourceA, "1");
+		FileDataSink sink2 = new FileDataSink(DummyOutputFormat.class, out2Path, sourceB, "2");
+		FileDataSink sink3 = new FileDataSink(DummyOutputFormat.class, out3Path, sourceA, "3");
+		FileDataSink sink4 = new FileDataSink(DummyOutputFormat.class, out4Path, sourceB, "4");
+		
+		
+		List<FileDataSink> sinks = new ArrayList<FileDataSink>();
+		sinks.add(sink1);
+		sinks.add(sink2);
+		sinks.add(sink3);
+		sinks.add(sink4);
+		
+		// return the PACT plan
+		Plan plan = new Plan(sinks, "Disjoint plan with multiple data sinks and branches");
+		compileNoStats(plan);
+	}
+	
+	@Test
+	public void testBranchAfterIteration() {
+		FileDataSource sourceA = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 2");
+		
+		BulkIteration iteration = new BulkIteration("Loop");
+		iteration.setInput(sourceA);
+		iteration.setMaximumNumberOfIterations(10);
+		
+		MapOperator mapper = MapOperator.builder(IdentityMap.class).name("Mapper").input(iteration.getPartialSolution()).build();
+		iteration.setNextPartialSolution(mapper);
+		
+		FileDataSink sink1 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, iteration, "Sink 1");
+		
+		MapOperator postMap = MapOperator.builder(IdentityMap.class).name("Post Iteration Mapper")
+				.input(iteration).build();
+		
+		FileDataSink sink2 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, postMap, "Sink 2");
+		
+		List<FileDataSink> sinks = new ArrayList<FileDataSink>();
+		sinks.add(sink1);
+		sinks.add(sink2);
+		
+		Plan plan = new Plan(sinks);
+		
+		try {
+			compileNoStats(plan);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testBranchBeforeIteration() {
+		FileDataSource source1 = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 1");
+		FileDataSource source2 = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 2");
+		
+		BulkIteration iteration = new BulkIteration("Loop");
+		iteration.setInput(source2);
+		iteration.setMaximumNumberOfIterations(10);
+		
+		MapOperator inMap = MapOperator.builder(new IdentityMap())
+				                       .input(source1)
+				                       .name("In Iteration Map")
+				                       .setBroadcastVariable("BC", iteration.getPartialSolution())
+				                       .build();
+		
+		iteration.setNextPartialSolution(inMap);
+		
+		MapOperator postMap = MapOperator.builder(new IdentityMap())
+										 .input(source1)
+										 .name("Post Iteration Map")
+										 .setBroadcastVariable("BC", iteration)
+										 .build();
+		
+		FileDataSink sink = new FileDataSink(DummyOutputFormat.class, OUT_FILE, postMap, "Sink");
+		
+		Plan plan = new Plan(sink);
+		
+		try {
+			compileNoStats(plan);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail(e.getMessage());
+		}
+	}
+
+	/**
+	 * Test to ensure that sourceA is inside as well as outside of the iteration the same
+	 * node.
+	 *
+	 * <pre>
+	 *       (SRC A)               (SRC B)
+	 *      /       \             /       \
+	 *  (SINK 1)   (ITERATION)    |     (SINK 2)
+	 *             /        \     /
+	 *         (SINK 3)     (CROSS => NEXT PARTIAL SOLUTION)
+	 * </pre>
+	 */
+	@Test
+	public void testClosure() {
+		FileDataSource sourceA = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 1");
+		FileDataSource sourceB = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 2");
+
+		FileDataSink sink1 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, sourceA, "Sink 1");
+		FileDataSink sink2 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, sourceB, "Sink 2");
+
+		BulkIteration iteration = new BulkIteration("Loop");
+		iteration.setInput(sourceA);
+		iteration.setMaximumNumberOfIterations(10);
+
+		CrossOperator stepFunction = CrossOperator.builder(DummyCrossStub.class).name("StepFunction").
+				input1(iteration.getPartialSolution()).
+				input2(sourceB).
+				build();
+
+		iteration.setNextPartialSolution(stepFunction);
+
+		FileDataSink sink3 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, iteration, "Sink 3");
+
+		List<FileDataSink> sinks = new ArrayList<FileDataSink>();
+		sinks.add(sink1);
+		sinks.add(sink2);
+		sinks.add(sink3);
+
+		Plan plan = new Plan(sinks);
+
+		try{
+			compileNoStats(plan);
+		}catch(Exception e){
+			e.printStackTrace();
+			Assert.fail(e.getMessage());
+		}
+	}
+
+	/**
+	 * <pre>
+	 *       (SRC A)         (SRC B)          (SRC C)
+	 *      /       \       /                /       \
+	 *  (SINK 1) (DELTA ITERATION)          |     (SINK 2)
+	 *             /    |   \               /
+	 *         (SINK 3) |   (CROSS => NEXT WORKSET)
+	 *                  |             |
+	 *                (JOIN => SOLUTION SET DELTA)
+	 * </pre>
+	 */
+	@Test
+	public void testClosureDeltaIteration() {
+		FileDataSource sourceA = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 1");
+		FileDataSource sourceB = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 2");
+		FileDataSource sourceC = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 3");
+
+		FileDataSink sink1 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, sourceA, "Sink 1");
+		FileDataSink sink2 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, sourceC, "Sink 2");
+
+		DeltaIteration iteration = new DeltaIteration(0, "Loop");
+		iteration.setInitialSolutionSet(sourceA);
+		iteration.setInitialWorkset(sourceB);
+		iteration.setMaximumNumberOfIterations(10);
+
+		CrossOperator nextWorkset = CrossOperator.builder(DummyCrossStub.class).name("Next workset").
+				input1(iteration.getWorkset()).
+				input2(sourceC).
+				build();
+
+		JoinOperator solutionSetDelta = JoinOperator.builder(DummyMatchStub.class, LongValue.class,0,0).
+				name("Next solution set.").
+				input1(nextWorkset).
+				input2(iteration.getSolutionSet()).
+				build();
+
+		iteration.setNextWorkset(nextWorkset);
+		iteration.setSolutionSetDelta(solutionSetDelta);
+
+		FileDataSink sink3 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, iteration, "Sink 3");
+
+		List<FileDataSink> sinks = new ArrayList<FileDataSink>();
+		sinks.add(sink1);
+		sinks.add(sink2);
+		sinks.add(sink3);
+
+		Plan plan = new Plan(sinks);
+
+		try{
+			compileNoStats(plan);
+		}catch(Exception e){
+			e.printStackTrace();
+			Assert.fail(e.getMessage());
+		}
+	}
+
+	/**
+	 * <pre>
+	 *                  +----Iteration-------+
+	 *                  |                    |
+	 *       /---------< >---------join-----< >---sink
+	 *      / (Solution)|           /        |
+	 *     /            |          /         |
+	 *    /--map-------< >----\   /       /--|
+	 *   /     (Workset)|      \ /       /   |
+	 * src-map          |     join------/    |
+	 *   \              |      /             |
+	 *    \             +-----/--------------+
+	 *     \                 /
+	 *      \--reduce-------/
+	 * </pre>
+	 */
+	@Test
+	public void testDeltaIterationWithStaticInput() {
+		FileDataSource source = new FileDataSource(DummyInputFormat.class, IN_FILE, "source");
+
+		MapOperator mappedSource = MapOperator.builder(IdentityMap.class).
+				input(source).
+				name("Identity mapped source").
+				build();
+
+		ReduceOperator reducedSource = ReduceOperator.builder(IdentityReduce.class).
+				input(source).
+				name("Identity reduce source").
+				build();
+
+		DeltaIteration iteration = new DeltaIteration(0,"Loop");
+		iteration.setMaximumNumberOfIterations(10);
+		iteration.setInitialSolutionSet(source);
+		iteration.setInitialWorkset(mappedSource);
+
+		JoinOperator nextWorkset = JoinOperator.builder(DummyNonPreservingMatchStub.class, IntValue.class, 0,0).
+				input1(iteration.getWorkset()).
+				input2(reducedSource).
+				name("Next work set").
+				build();
+
+		JoinOperator solutionSetDelta = JoinOperator.builder(DummyNonPreservingMatchStub.class, IntValue.class, 0,
+				0).
+				input1(iteration.getSolutionSet()).
+				input2(nextWorkset).
+				name("Solution set delta").
+				build();
+
+		iteration.setNextWorkset(nextWorkset);
+		iteration.setSolutionSetDelta(solutionSetDelta);
+
+		FileDataSink sink = new FileDataSink(DummyOutputFormat.class, OUT_FILE, iteration, "Iteration sink");
+		List<FileDataSink> sinks = new ArrayList<FileDataSink>();
+		sinks.add(sink);
+
+		Plan plan = new Plan(sinks);
+
+		try{
+			compileNoStats(plan);
+		}catch(Exception e){
+			e.printStackTrace();
+			Assert.fail(e.getMessage());
+		}
+	}
+
+	/**
+	 * <pre>
+	 *             +---------Iteration-------+
+	 *             |                         |
+	 *    /--map--< >----\                   |
+	 *   /         |      \         /-------< >---sink
+	 * src-map     |     join------/         |
+	 *   \         |      /                  |
+	 *    \        +-----/-------------------+
+	 *     \            /
+	 *      \--reduce--/
+	 * </pre>
+	 */
+	@Test
+	public void testIterationWithStaticInput() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setDegreeOfParallelism(100);
+
+			DataSet<Long> source = env.generateSequence(1, 1000000);
+
+			DataSet<Long> mapped = source.map(new IdentityMapper<Long>());
+
+			DataSet<Long> reduced = source.groupBy(new IdentityKeyExtractor<Long>()).reduce(new SelectOneReducer<Long>());
+
+			IterativeDataSet<Long> iteration = mapped.iterate(10);
+			iteration.closeWith(
+					iteration.join(reduced)
+							.where(new IdentityKeyExtractor<Long>())
+							.equalTo(new IdentityKeyExtractor<Long>())
+							.with(new DummyFlatJoinFunction<Long>()))
+					.output(new DiscardingOutputFormat<Long>());
+
+			compileNoStats(env.createProgramPlan());
+		}
+		catch(Exception e){
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testBranchingBroadcastVariable() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setDegreeOfParallelism(100);
+
+		DataSet<String> input1 = env.readTextFile(IN_FILE).name("source1");
+		DataSet<String> input2 = env.readTextFile(IN_FILE).name("source2");
+		DataSet<String> input3 = env.readTextFile(IN_FILE).name("source3");
+		
+		DataSet<String> result1 = input1
+				.map(new IdentityMapper<String>())
+				.reduceGroup(new Top1GroupReducer<String>())
+					.withBroadcastSet(input3, "bc");
+		
+		DataSet<String> result2 = input2
+				.map(new IdentityMapper<String>())
+				.reduceGroup(new Top1GroupReducer<String>())
+					.withBroadcastSet(input3, "bc");
+		
+		result1.join(result2)
+				.where(new IdentityKeyExtractor<String>())
+				.equalTo(new IdentityKeyExtractor<String>())
+				.with(new RichJoinFunction<String, String, String>() {
+					@Override
+					public String join(String first, String second) {
+						return null;
+					}
+				})
+				.withBroadcastSet(input3, "bc1")
+				.withBroadcastSet(input1, "bc2")
+				.withBroadcastSet(result1, "bc3")
+			.print();
+		
+		Plan plan = env.createProgramPlan();
+		
+		try{
+			compileNoStats(plan);
+		}catch(Exception e){
+			e.printStackTrace();
+			Assert.fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testBCVariableClosure() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		DataSet<String> input = env.readTextFile(IN_FILE).name("source1");
+		
+		DataSet<String> reduced = input
+				.map(new IdentityMapper<String>())
+				.reduceGroup(new Top1GroupReducer<String>());
+		
+		
+		DataSet<String> initialSolution = input.map(new IdentityMapper<String>()).withBroadcastSet(reduced, "bc");
+		
+		
+		IterativeDataSet<String> iteration = initialSolution.iterate(100);
+		
+		iteration.closeWith(iteration.map(new IdentityMapper<String>()).withBroadcastSet(reduced, "red"))
+				.print();
+		
+		Plan plan = env.createProgramPlan();
+		
+		try{
+			compileNoStats(plan);
+		}catch(Exception e){
+			e.printStackTrace();
+			Assert.fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testMultipleIterations() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setDegreeOfParallelism(100);
+		
+		DataSet<String> input = env.readTextFile(IN_FILE).name("source1");
+		
+		DataSet<String> reduced = input
+				.map(new IdentityMapper<String>())
+				.reduceGroup(new Top1GroupReducer<String>());
+			
+		IterativeDataSet<String> iteration1 = input.iterate(100);
+		IterativeDataSet<String> iteration2 = input.iterate(20);
+		IterativeDataSet<String> iteration3 = input.iterate(17);
+		
+		iteration1.closeWith(iteration1.map(new IdentityMapper<String>()).withBroadcastSet(reduced, "bc1")).print();
+		iteration2.closeWith(iteration2.reduceGroup(new Top1GroupReducer<String>()).withBroadcastSet(reduced, "bc2")).print();
+		iteration3.closeWith(iteration3.reduceGroup(new IdentityGroupReducer<String>()).withBroadcastSet(reduced, "bc3")).print();
+		
+		Plan plan = env.createProgramPlan();
+		
+		try{
+			compileNoStats(plan);
+		}catch(Exception e){
+			e.printStackTrace();
+			Assert.fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testMultipleIterationsWithClosueBCVars() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setDegreeOfParallelism(100);
+
+		DataSet<String> input = env.readTextFile(IN_FILE).name("source1");
+			
+		IterativeDataSet<String> iteration1 = input.iterate(100);
+		IterativeDataSet<String> iteration2 = input.iterate(20);
+		IterativeDataSet<String> iteration3 = input.iterate(17);
+		
+		
+		iteration1.closeWith(iteration1.map(new IdentityMapper<String>())).print();
+		iteration2.closeWith(iteration2.reduceGroup(new Top1GroupReducer<String>())).print();
+		iteration3.closeWith(iteration3.reduceGroup(new IdentityGroupReducer<String>())).print();
+		
+		Plan plan = env.createProgramPlan();
+		
+		try{
+			compileNoStats(plan);
+		}catch(Exception e){
+			e.printStackTrace();
+			Assert.fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testBranchesOnlyInBCVariables1() {
+		try{
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setDegreeOfParallelism(100);
+
+			DataSet<Long> input = env.generateSequence(1, 10);
+			DataSet<Long> bc_input = env.generateSequence(1, 10);
+			
+			input
+				.map(new IdentityMapper<Long>()).withBroadcastSet(bc_input, "name1")
+				.map(new IdentityMapper<Long>()).withBroadcastSet(bc_input, "name2")
+				.print();
+			
+			Plan plan = env.createProgramPlan();
+			compileNoStats(plan);
+		}
+		catch(Exception e){
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testBranchesOnlyInBCVariables2() {
+		try{
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setDegreeOfParallelism(100);
+
+			DataSet<Tuple2<Long, Long>> input = env.generateSequence(1, 10).map(new Duplicator<Long>()).name("proper input");
+			
+			DataSet<Long> bc_input1 = env.generateSequence(1, 10).name("BC input 1");
+			DataSet<Long> bc_input2 = env.generateSequence(1, 10).name("BC input 1");
+			
+			DataSet<Tuple2<Long, Long>> joinInput1 =
+					input.map(new IdentityMapper<Tuple2<Long,Long>>())
+						.withBroadcastSet(bc_input1.map(new IdentityMapper<Long>()), "bc1")
+						.withBroadcastSet(bc_input2, "bc2");
+			
+			DataSet<Tuple2<Long, Long>> joinInput2 =
+					input.map(new IdentityMapper<Tuple2<Long,Long>>())
+						.withBroadcastSet(bc_input1, "bc1")
+						.withBroadcastSet(bc_input2, "bc2");
+			
+			DataSet<Tuple2<Long, Long>> joinResult = joinInput1
+				.join(joinInput2, JoinHint.REPARTITION_HASH_FIRST).where(0).equalTo(1)
+				.with(new DummyFlatJoinFunction<Tuple2<Long,Long>>());
+			
+			input
+				.map(new IdentityMapper<Tuple2<Long,Long>>())
+					.withBroadcastSet(bc_input1, "bc1")
+				.union(joinResult)
+				.print();
+			
+			Plan plan = env.createProgramPlan();
+			compileNoStats(plan);
+		}
+		catch(Exception e){
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	private static final class Duplicator<T> implements MapFunction<T, Tuple2<T, T>> {
+		
+		@Override
+		public Tuple2<T, T> map(T value) {
+			return new Tuple2<T, T>(value, value);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/BroadcastVariablePipelinebreakerTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/BroadcastVariablePipelinebreakerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/BroadcastVariablePipelinebreakerTest.java
new file mode 100644
index 0000000..c7ad2da
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/BroadcastVariablePipelinebreakerTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.Plan;
+import org.junit.Test;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.optimizer.dag.TempMode;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.optimizer.testfunctions.IdentityMapper;
+
+@SuppressWarnings("serial")
+public class BroadcastVariablePipelinebreakerTest extends CompilerTestBase {
+
+	@Test
+	public void testNoBreakerForIndependentVariable() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<String> source1 = env.fromElements("test");
+			DataSet<String> source2 = env.fromElements("test");
+			
+			source1.map(new IdentityMapper<String>()).withBroadcastSet(source2, "some name").print();
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			SinkPlanNode sink = op.getDataSinks().iterator().next();
+			SingleInputPlanNode mapper = (SingleInputPlanNode) sink.getInput().getSource();
+			
+			assertEquals(TempMode.NONE, mapper.getInput().getTempMode());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	 @Test
+	public void testBreakerForDependentVariable() {
+			try {
+				ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+				
+				DataSet<String> source1 = env.fromElements("test");
+				
+				source1.map(new IdentityMapper<String>()).map(new IdentityMapper<String>()).withBroadcastSet(source1, "some name").print();
+				
+				Plan p = env.createProgramPlan();
+				OptimizedPlan op = compileNoStats(p);
+				
+				SinkPlanNode sink = op.getDataSinks().iterator().next();
+				SingleInputPlanNode mapper = (SingleInputPlanNode) sink.getInput().getSource();
+				
+				assertEquals(TempMode.PIPELINE_BREAKER, mapper.getInput().getTempMode());
+			}
+			catch (Exception e) {
+				e.printStackTrace();
+				fail(e.getMessage());
+			}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java
new file mode 100644
index 0000000..3e7da6c
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import org.junit.Test;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.operators.GenericDataSourceBase;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.IterativeDataSet;
+import org.apache.flink.api.common.functions.RichJoinFunction;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.optimizer.dag.TempMode;
+import org.apache.flink.optimizer.plan.DualInputPlanNode;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.operators.DriverStrategy;
+
+/**
+* Tests that validate optimizer choice when using hash joins inside of iterations
+*/
+@SuppressWarnings("serial")
+public class CachedMatchStrategyCompilerTest extends CompilerTestBase {
+
+	/**
+	 * This tests whether a HYBRIDHASH_BUILD_SECOND is correctly transformed to a HYBRIDHASH_BUILD_SECOND_CACHED
+	 * when inside of an iteration an on the static path
+	 */
+	@Test
+	public void testRightSide() {
+		try {
+			
+			Plan plan = getTestPlanRightStatic(Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND);
+			
+			OptimizedPlan oPlan = compileNoStats(plan);
+	
+			OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(oPlan);
+			DualInputPlanNode innerJoin = resolver.getNode("DummyJoiner");
+			
+			// verify correct join strategy
+			assertEquals(DriverStrategy.HYBRIDHASH_BUILD_SECOND_CACHED, innerJoin.getDriverStrategy()); 
+			assertEquals(TempMode.NONE, innerJoin.getInput1().getTempMode());
+			assertEquals(TempMode.NONE, innerJoin.getInput2().getTempMode());
+		
+			new JobGraphGenerator().compileJobGraph(oPlan);
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail("Test errored: " + e.getMessage());
+		}
+	}
+	
+	/**
+	 * This test makes sure that only a HYBRIDHASH on the static path is transformed to the cached variant
+	 */
+	@Test
+	public void testRightSideCountercheck() {
+		try {
+			
+			Plan plan = getTestPlanRightStatic(Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_FIRST);
+			
+			OptimizedPlan oPlan = compileNoStats(plan);
+	
+			OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(oPlan);
+			DualInputPlanNode innerJoin = resolver.getNode("DummyJoiner");
+			
+			// verify correct join strategy
+			assertEquals(DriverStrategy.HYBRIDHASH_BUILD_FIRST, innerJoin.getDriverStrategy()); 
+			assertEquals(TempMode.NONE, innerJoin.getInput1().getTempMode());
+			assertEquals(TempMode.CACHED, innerJoin.getInput2().getTempMode());
+		
+			new JobGraphGenerator().compileJobGraph(oPlan);
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail("Test errored: " + e.getMessage());
+		}
+	}
+	
+	/**
+	 * This tests whether a HYBRIDHASH_BUILD_FIRST is correctly transformed to a HYBRIDHASH_BUILD_FIRST_CACHED
+	 * when inside of an iteration an on the static path
+	 */
+	@Test
+	public void testLeftSide() {
+		try {
+			
+			Plan plan = getTestPlanLeftStatic(Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_FIRST);
+			
+			OptimizedPlan oPlan = compileNoStats(plan);
+	
+			OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(oPlan);
+			DualInputPlanNode innerJoin = resolver.getNode("DummyJoiner");
+			
+			// verify correct join strategy
+			assertEquals(DriverStrategy.HYBRIDHASH_BUILD_FIRST_CACHED, innerJoin.getDriverStrategy());
+			assertEquals(TempMode.NONE, innerJoin.getInput1().getTempMode());
+			assertEquals(TempMode.NONE, innerJoin.getInput2().getTempMode());
+		
+			new JobGraphGenerator().compileJobGraph(oPlan);
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail("Test errored: " + e.getMessage());
+		}
+	}
+	
+	/**
+	 * This test makes sure that only a HYBRIDHASH on the static path is transformed to the cached variant
+	 */
+	@Test
+	public void testLeftSideCountercheck() {
+		try {
+			
+			Plan plan = getTestPlanLeftStatic(Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND);
+			
+			OptimizedPlan oPlan = compileNoStats(plan);
+	
+			OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(oPlan);
+			DualInputPlanNode innerJoin = resolver.getNode("DummyJoiner");
+			
+			// verify correct join strategy
+			assertEquals(DriverStrategy.HYBRIDHASH_BUILD_SECOND, innerJoin.getDriverStrategy());
+			assertEquals(TempMode.CACHED, innerJoin.getInput1().getTempMode());
+			assertEquals(TempMode.NONE, innerJoin.getInput2().getTempMode());
+		
+			new JobGraphGenerator().compileJobGraph(oPlan);
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail("Test errored: " + e.getMessage());
+		}
+	}
+	
+	/**
+	 * This test simulates a join of a big left side with a small right side inside of an iteration, where the small side is on a static path.
+	 * Currently the best execution plan is a HYBRIDHASH_BUILD_SECOND_CACHED, where the small side is hashed and cached.
+	 * This test also makes sure that all relevant plans are correctly enumerated by the optimizer.
+	 */
+	@Test
+	public void testCorrectChoosing() {
+		try {
+			
+			Plan plan = getTestPlanRightStatic("");
+			
+			SourceCollectorVisitor sourceCollector = new SourceCollectorVisitor();
+			plan.accept(sourceCollector);
+			
+			for(GenericDataSourceBase<?, ?> s : sourceCollector.getSources()) {
+				if(s.getName().equals("bigFile")) {
+					this.setSourceStatistics(s, 10000000, 1000);
+				}
+				else if(s.getName().equals("smallFile")) {
+					this.setSourceStatistics(s, 100, 100);
+				}
+			}
+			
+			
+			OptimizedPlan oPlan = compileNoStats(plan);
+	
+			OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(oPlan);
+			DualInputPlanNode innerJoin = resolver.getNode("DummyJoiner");
+			
+			// verify correct join strategy
+			assertEquals(DriverStrategy.HYBRIDHASH_BUILD_SECOND_CACHED, innerJoin.getDriverStrategy());
+			assertEquals(TempMode.NONE, innerJoin.getInput1().getTempMode());
+			assertEquals(TempMode.NONE, innerJoin.getInput2().getTempMode());
+		
+			new JobGraphGenerator().compileJobGraph(oPlan);
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail("Test errored: " + e.getMessage());
+		}
+	}
+	
+	private Plan getTestPlanRightStatic(String strategy) {
+		
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+		
+		DataSet<Tuple3<Long, Long, Long>> bigInput = env.readCsvFile("file://bigFile").types(Long.class, Long.class, Long.class).name("bigFile");
+		
+		DataSet<Tuple3<Long, Long, Long>> smallInput = env.readCsvFile("file://smallFile").types(Long.class, Long.class, Long.class).name("smallFile");
+		
+		IterativeDataSet<Tuple3<Long, Long, Long>> iteration = bigInput.iterate(10);
+		
+		Configuration joinStrategy = new Configuration();
+		joinStrategy.setString(Optimizer.HINT_SHIP_STRATEGY, Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH);
+		
+		if(strategy != "") {
+			joinStrategy.setString(Optimizer.HINT_LOCAL_STRATEGY, strategy);
+		}
+		
+		DataSet<Tuple3<Long, Long, Long>> inner = iteration.join(smallInput).where(0).equalTo(0).with(new DummyJoiner()).name("DummyJoiner").withParameters(joinStrategy);
+
+		DataSet<Tuple3<Long, Long, Long>> output = iteration.closeWith(inner);
+		
+		output.print();
+		
+		return env.createProgramPlan();
+		
+	}
+	
+	private Plan getTestPlanLeftStatic(String strategy) {
+		
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+		
+		@SuppressWarnings("unchecked")
+		DataSet<Tuple3<Long, Long, Long>> bigInput = env.fromElements(new Tuple3<Long, Long, Long>(1L, 2L, 3L),
+				new Tuple3<Long, Long, Long>(1L, 2L, 3L),new Tuple3<Long, Long, Long>(1L, 2L, 3L)).name("Big");
+		
+		@SuppressWarnings("unchecked")
+		DataSet<Tuple3<Long, Long, Long>> smallInput = env.fromElements(new Tuple3<Long, Long, Long>(1L, 2L, 3L)).name("Small");
+		
+		IterativeDataSet<Tuple3<Long, Long, Long>> iteration = bigInput.iterate(10);
+		
+		Configuration joinStrategy = new Configuration();
+		joinStrategy.setString(Optimizer.HINT_LOCAL_STRATEGY, strategy);
+		
+		DataSet<Tuple3<Long, Long, Long>> inner = smallInput.join(iteration).where(0).equalTo(0).with(new DummyJoiner()).name("DummyJoiner").withParameters(joinStrategy);
+
+		DataSet<Tuple3<Long, Long, Long>> output = iteration.closeWith(inner);
+		
+		output.print();
+		
+		return env.createProgramPlan();
+		
+	}
+	
+	private static class DummyJoiner extends RichJoinFunction<Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>> {
+
+		@Override
+		public Tuple3<Long, Long, Long> join(Tuple3<Long, Long, Long> first,
+				Tuple3<Long, Long, Long> second) throws Exception {
+
+			return first;
+		}
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/CoGroupSolutionSetFirstTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/CoGroupSolutionSetFirstTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/CoGroupSolutionSetFirstTest.java
new file mode 100644
index 0000000..eba07f1
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/CoGroupSolutionSetFirstTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer;
+
+import org.apache.flink.api.common.functions.RichCoGroupFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.junit.Assert;
+import org.junit.Test;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.DualInputPlanNode;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.PlanNode;
+import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Visitor;
+
+@SuppressWarnings("serial")
+public class CoGroupSolutionSetFirstTest extends CompilerTestBase {
+	
+	public static class SimpleCGroup extends RichCoGroupFunction<Tuple1<Integer>, Tuple1<Integer>, Tuple1<Integer>> {
+		@Override
+		public void coGroup(Iterable<Tuple1<Integer>> first, Iterable<Tuple1<Integer>> second, Collector<Tuple1<Integer>> out) {}
+	}
+
+	public static class SimpleMap extends RichMapFunction<Tuple1<Integer>, Tuple1<Integer>> {
+		@Override
+		public Tuple1<Integer> map(Tuple1<Integer> value) throws Exception {
+			return null;
+		}
+	}
+
+	@Test
+	public void testCoGroupSolutionSet() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple1<Integer>> raw = env.readCsvFile(IN_FILE).types(Integer.class);
+
+		DeltaIteration<Tuple1<Integer>, Tuple1<Integer>> iteration = raw.iterateDelta(raw, 1000, 0);
+
+		DataSet<Tuple1<Integer>> test = iteration.getWorkset().map(new SimpleMap());
+		DataSet<Tuple1<Integer>> delta = iteration.getSolutionSet().coGroup(test).where(0).equalTo(0).with(new SimpleCGroup());
+		DataSet<Tuple1<Integer>> feedback = iteration.getWorkset().map(new SimpleMap());
+		DataSet<Tuple1<Integer>> result = iteration.closeWith(delta, feedback);
+
+		result.print();
+
+		Plan plan = env.createProgramPlan();
+		OptimizedPlan oPlan = null;
+		try {
+			oPlan = compileNoStats(plan);
+		} catch(CompilerException e) {
+			Assert.fail(e.getMessage());
+		}
+
+		oPlan.accept(new Visitor<PlanNode>() {
+			@Override
+			public boolean preVisit(PlanNode visitable) {
+				if (visitable instanceof WorksetIterationPlanNode) {
+					PlanNode deltaNode = ((WorksetIterationPlanNode) visitable).getSolutionSetDeltaPlanNode();
+
+					//get the CoGroup
+					DualInputPlanNode dpn = (DualInputPlanNode) deltaNode.getInputs().iterator().next().getSource();
+					Channel in1 = dpn.getInput1();
+					Channel in2 = dpn.getInput2();
+
+					Assert.assertTrue(in1.getLocalProperties().getOrdering() == null);
+					Assert.assertTrue(in2.getLocalProperties().getOrdering() != null);
+					Assert.assertTrue(in2.getLocalProperties().getOrdering().getInvolvedIndexes().contains(0));
+					Assert.assertTrue(in1.getShipStrategy() == ShipStrategyType.FORWARD);
+					Assert.assertTrue(in2.getShipStrategy() == ShipStrategyType.PARTITION_HASH);
+					return false;
+				}
+				return true;
+			}
+
+			@Override
+			public void postVisit(PlanNode visitable) {}
+		});
+	}
+}