You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/20 11:07:08 UTC

[29/53] [abbrv] flink git commit: [optimizer] Rename optimizer project to "flink-optimizer" (previously flink-compiler)

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/InterestingPropertyVisitor.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/InterestingPropertyVisitor.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/InterestingPropertyVisitor.java
deleted file mode 100644
index c2e81a8..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/InterestingPropertyVisitor.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.traversals;
-
-import org.apache.flink.optimizer.costs.CostEstimator;
-import org.apache.flink.optimizer.dag.OptimizerNode;
-import org.apache.flink.util.Visitor;
-
-/**
- * Visitor that computes the interesting properties for each node in the optimizer DAG. On its recursive
- * depth-first descend, it propagates all interesting properties top-down.
- */
-public class InterestingPropertyVisitor implements Visitor<OptimizerNode> {
-
-	private CostEstimator estimator; // the cost estimator for maximal costs of an interesting property
-
-	/**
-	 * Creates a new visitor that computes the interesting properties for all nodes in the plan.
-	 * It uses the given cost estimator used to compute the maximal costs for an interesting property.
-	 *
-	 * @param estimator
-	 *        The cost estimator to estimate the maximal costs for interesting properties.
-	 */
-	public InterestingPropertyVisitor(CostEstimator estimator) {
-		this.estimator = estimator;
-	}
-
-	@Override
-	public boolean preVisit(OptimizerNode node) {
-		// The interesting properties must be computed on the descend. In case a node has multiple outputs,
-		// that computation must happen during the last descend.
-
-		if (node.getInterestingProperties() == null && node.haveAllOutputConnectionInterestingProperties()) {
-			node.computeUnionOfInterestingPropertiesFromSuccessors();
-			node.computeInterestingPropertiesForInputs(this.estimator);
-			return true;
-		} else {
-			return false;
-		}
-	}
-
-	@Override
-	public void postVisit(OptimizerNode visitable) {}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/PlanFinalizer.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/PlanFinalizer.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/PlanFinalizer.java
deleted file mode 100644
index 58aa3c1..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/PlanFinalizer.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.traversals;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.Optimizer;
-import org.apache.flink.optimizer.dag.TempMode;
-import org.apache.flink.optimizer.plan.BinaryUnionPlanNode;
-import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
-import org.apache.flink.optimizer.plan.BulkPartialSolutionPlanNode;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.IterationPlanNode;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.PlanNode;
-import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.plan.SolutionSetPlanNode;
-import org.apache.flink.optimizer.plan.SourcePlanNode;
-import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
-import org.apache.flink.optimizer.plan.WorksetPlanNode;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.util.Visitor;
-
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Deque;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-/**
- * This visitor traverses the selected execution plan and finalizes it:
- *
- * <ul>
- *     <li>The graph of nodes is double-linked (links from child to parent are inserted).</li>
- *     <li>If unions join static and dynamic paths, the cache is marked as a memory consumer.</li>
- *     <li>Relative memory fractions are assigned to all nodes.</li>
- *     <li>All nodes are collected into a set.</li>
- * </ul>
- */
-public class PlanFinalizer implements Visitor<PlanNode> {
-
-	private final Set<PlanNode> allNodes; // a set of all nodes in the optimizer plan
-
-	private final List<SourcePlanNode> sources; // all data source nodes in the optimizer plan
-
-	private final List<SinkPlanNode> sinks; // all data sink nodes in the optimizer plan
-
-	private final Deque<IterationPlanNode> stackOfIterationNodes;
-
-	private int memoryConsumerWeights; // a counter of all memory consumers
-
-	/**
-	 * Creates a new plan finalizer.
-	 */
-	public PlanFinalizer() {
-		this.allNodes = new HashSet<PlanNode>();
-		this.sources = new ArrayList<SourcePlanNode>();
-		this.sinks = new ArrayList<SinkPlanNode>();
-		this.stackOfIterationNodes = new ArrayDeque<IterationPlanNode>();
-	}
-
-	public OptimizedPlan createFinalPlan(List<SinkPlanNode> sinks, String jobName, Plan originalPlan) {
-		this.memoryConsumerWeights = 0;
-
-		// traverse the graph
-		for (SinkPlanNode node : sinks) {
-			node.accept(this);
-		}
-
-		// assign the memory to each node
-		if (this.memoryConsumerWeights > 0) {
-			for (PlanNode node : this.allNodes) {
-				// assign memory to the driver strategy of the node
-				final int consumerWeight = node.getMemoryConsumerWeight();
-				if (consumerWeight > 0) {
-					final double relativeMem = (double)consumerWeight / this.memoryConsumerWeights;
-					node.setRelativeMemoryPerSubtask(relativeMem);
-					if (Optimizer.LOG.isDebugEnabled()) {
-						Optimizer.LOG.debug("Assigned " + relativeMem + " of total memory to each subtask of " +
-							node.getProgramOperator().getName() + ".");
-					}
-				}
-
-				// assign memory to the local and global strategies of the channels
-				for (Channel c : node.getInputs()) {
-					if (c.getLocalStrategy().dams()) {
-						final double relativeMem = 1.0 / this.memoryConsumerWeights;
-						c.setRelativeMemoryLocalStrategy(relativeMem);
-						if (Optimizer.LOG.isDebugEnabled()) {
-							Optimizer.LOG.debug("Assigned " + relativeMem + " of total memory to each local strategy " +
-									"instance of " + c + ".");
-						}
-					}
-					if (c.getTempMode() != TempMode.NONE) {
-						final double relativeMem = 1.0/ this.memoryConsumerWeights;
-						c.setRelativeTempMemory(relativeMem);
-						if (Optimizer.LOG.isDebugEnabled()) {
-							Optimizer.LOG.debug("Assigned " + relativeMem + " of total memory to each instance of the temp " +
-									"table for " + c + ".");
-						}
-					}
-				}
-			}
-		}
-		return new OptimizedPlan(this.sources, this.sinks, this.allNodes, jobName, originalPlan);
-	}
-
-	@Override
-	public boolean preVisit(PlanNode visitable) {
-		// if we come here again, prevent a further descend
-		if (!this.allNodes.add(visitable)) {
-			return false;
-		}
-
-		if (visitable instanceof SinkPlanNode) {
-			this.sinks.add((SinkPlanNode) visitable);
-		}
-		else if (visitable instanceof SourcePlanNode) {
-			this.sources.add((SourcePlanNode) visitable);
-		}
-		else if (visitable instanceof BinaryUnionPlanNode) {
-			BinaryUnionPlanNode unionNode = (BinaryUnionPlanNode) visitable;
-			if (unionNode.unionsStaticAndDynamicPath()) {
-				unionNode.setDriverStrategy(DriverStrategy.UNION_WITH_CACHED);
-			}
-		}
-		else if (visitable instanceof BulkPartialSolutionPlanNode) {
-			// tell the partial solution about the iteration node that contains it
-			final BulkPartialSolutionPlanNode pspn = (BulkPartialSolutionPlanNode) visitable;
-			final IterationPlanNode iteration = this.stackOfIterationNodes.peekLast();
-
-			// sanity check!
-			if (iteration == null || !(iteration instanceof BulkIterationPlanNode)) {
-				throw new CompilerException("Bug: Error finalizing the plan. " +
-						"Cannot associate the node for a partial solutions with its containing iteration.");
-			}
-			pspn.setContainingIterationNode((BulkIterationPlanNode) iteration);
-		}
-		else if (visitable instanceof WorksetPlanNode) {
-			// tell the partial solution about the iteration node that contains it
-			final WorksetPlanNode wspn = (WorksetPlanNode) visitable;
-			final IterationPlanNode iteration = this.stackOfIterationNodes.peekLast();
-
-			// sanity check!
-			if (iteration == null || !(iteration instanceof WorksetIterationPlanNode)) {
-				throw new CompilerException("Bug: Error finalizing the plan. " +
-						"Cannot associate the node for a partial solutions with its containing iteration.");
-			}
-			wspn.setContainingIterationNode((WorksetIterationPlanNode) iteration);
-		}
-		else if (visitable instanceof SolutionSetPlanNode) {
-			// tell the partial solution about the iteration node that contains it
-			final SolutionSetPlanNode sspn = (SolutionSetPlanNode) visitable;
-			final IterationPlanNode iteration = this.stackOfIterationNodes.peekLast();
-
-			// sanity check!
-			if (iteration == null || !(iteration instanceof WorksetIterationPlanNode)) {
-				throw new CompilerException("Bug: Error finalizing the plan. " +
-						"Cannot associate the node for a partial solutions with its containing iteration.");
-			}
-			sspn.setContainingIterationNode((WorksetIterationPlanNode) iteration);
-		}
-
-		// double-connect the connections. previously, only parents knew their children, because
-		// one child candidate could have been referenced by multiple parents.
-		for (Channel conn : visitable.getInputs()) {
-			conn.setTarget(visitable);
-			conn.getSource().addOutgoingChannel(conn);
-		}
-
-		for (Channel c : visitable.getBroadcastInputs()) {
-			c.setTarget(visitable);
-			c.getSource().addOutgoingChannel(c);
-		}
-
-		// count the memory consumption
-		this.memoryConsumerWeights += visitable.getMemoryConsumerWeight();
-		for (Channel c : visitable.getInputs()) {
-			if (c.getLocalStrategy().dams()) {
-				this.memoryConsumerWeights++;
-			}
-			if (c.getTempMode() != TempMode.NONE) {
-				this.memoryConsumerWeights++;
-			}
-		}
-		for (Channel c : visitable.getBroadcastInputs()) {
-			if (c.getLocalStrategy().dams()) {
-				this.memoryConsumerWeights++;
-			}
-			if (c.getTempMode() != TempMode.NONE) {
-				this.memoryConsumerWeights++;
-			}
-		}
-
-		// pass the visitor to the iteraton's step function
-		if (visitable instanceof IterationPlanNode) {
-			// push the iteration node onto the stack
-			final IterationPlanNode iterNode = (IterationPlanNode) visitable;
-			this.stackOfIterationNodes.addLast(iterNode);
-
-			// recurse
-			((IterationPlanNode) visitable).acceptForStepFunction(this);
-
-			// pop the iteration node from the stack
-			this.stackOfIterationNodes.removeLast();
-		}
-		return true;
-	}
-
-	@Override
-	public void postVisit(PlanNode visitable) {}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/StaticDynamicPathIdentifier.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/StaticDynamicPathIdentifier.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/StaticDynamicPathIdentifier.java
deleted file mode 100644
index c0dc4dd..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/StaticDynamicPathIdentifier.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.traversals;
-
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.dag.IterationNode;
-import org.apache.flink.optimizer.dag.OptimizerNode;
-import org.apache.flink.util.Visitor;
-
-import java.util.HashSet;
-import java.util.Set;
-
-/**
- * A traversal that goes over the program data flow of an iteration and makes the nodes
- * that depend on the partial solution (the data set recomputed in each iteration) as "dynamic"
- * and the other nodes as "static".
- */
-public class StaticDynamicPathIdentifier implements Visitor<OptimizerNode> {
-
-	private final Set<OptimizerNode> seenBefore = new HashSet<OptimizerNode>();
-
-	private final int costWeight;
-
-	public StaticDynamicPathIdentifier(int costWeight) {
-		this.costWeight = costWeight;
-	}
-
-	@Override
-	public boolean preVisit(OptimizerNode visitable) {
-		return this.seenBefore.add(visitable);
-	}
-
-	@Override
-	public void postVisit(OptimizerNode visitable) {
-		visitable.identifyDynamicPath(this.costWeight);
-
-		// check that there is no nested iteration on the dynamic path
-		if (visitable.isOnDynamicPath() && visitable instanceof IterationNode) {
-			throw new CompilerException("Nested iterations are currently not supported.");
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/StepFunctionValidator.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/StepFunctionValidator.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/StepFunctionValidator.java
deleted file mode 100644
index d359490..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/StepFunctionValidator.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.traversals;
-
-import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.common.operators.base.DeltaIterationBase;
-import org.apache.flink.util.Visitor;
-
-import java.util.HashSet;
-import java.util.Set;
-
-/**
- * A traversal that checks if the Workset of a delta iteration is used in the data flow
- * of its step function.
- */
-public class StepFunctionValidator implements Visitor<Operator<?>> {
-
-	private final Set<Operator<?>> seenBefore = new HashSet<Operator<?>>();
-
-	private boolean foundWorkset;
-
-	@Override
-	public boolean preVisit(Operator<?> visitable) {
-		if (visitable instanceof DeltaIterationBase.WorksetPlaceHolder) {
-			foundWorkset = true;
-		}
-
-		return (!foundWorkset) && seenBefore.add(visitable);
-	}
-
-	@Override
-	public void postVisit(Operator<?> visitable) {}
-
-	public boolean hasFoundWorkset() {
-		return foundWorkset;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/package-info.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/package-info.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/package-info.java
deleted file mode 100644
index cd8766c..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/package-info.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * This package contains the various traversals over the program plan and the
- * optimizer DAG (directed acyclic graph) that are made in the course of
- * the optimization.
- *
- * The traversals are mostly implemented as a {@link org.apache.flink.util.Visitor} that
- * traversed the program flow.
- */
-package org.apache.flink.optimizer.traversals;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/util/NoOpBinaryUdfOp.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/util/NoOpBinaryUdfOp.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/util/NoOpBinaryUdfOp.java
deleted file mode 100644
index 5110849..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/util/NoOpBinaryUdfOp.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.util;
-
-import java.util.List;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.functions.util.NoOpFunction;
-import org.apache.flink.api.common.operators.BinaryOperatorInformation;
-import org.apache.flink.api.common.operators.DualInputOperator;
-import org.apache.flink.api.common.operators.RecordOperator;
-import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.types.Key;
-
-
-public class NoOpBinaryUdfOp<OUT> extends DualInputOperator<OUT, OUT, OUT, NoOpFunction> implements RecordOperator {
-
-	public NoOpBinaryUdfOp(TypeInformation<OUT> type) {
-		super(new UserCodeClassWrapper<NoOpFunction>(NoOpFunction.class), new BinaryOperatorInformation<OUT, OUT, OUT>(type, type, type), "NoContract");
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public Class<? extends Key<?>>[] getKeyClasses() {
-		return (Class<? extends Key<?>>[]) new Class[0];
-	}
-
-	@Override
-	protected List<OUT> executeOnCollections(List<OUT> inputData1, List<OUT> inputData2, RuntimeContext runtimeContext, ExecutionConfig executionConfig) {
-		throw new UnsupportedOperationException();
-	}
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/util/NoOpUnaryUdfOp.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/util/NoOpUnaryUdfOp.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/util/NoOpUnaryUdfOp.java
deleted file mode 100644
index cc4a4d6..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/util/NoOpUnaryUdfOp.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.util;
-
-import java.util.List;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.functions.util.NoOpFunction;
-import org.apache.flink.api.common.operators.RecordOperator;
-import org.apache.flink.api.common.operators.SingleInputOperator;
-import org.apache.flink.api.common.operators.UnaryOperatorInformation;
-import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.types.Key;
-
-
-public class NoOpUnaryUdfOp<OUT> extends SingleInputOperator<OUT, OUT, NoOpFunction> implements RecordOperator {
-	
-	@SuppressWarnings("rawtypes")
-	public static final NoOpUnaryUdfOp INSTANCE = new NoOpUnaryUdfOp();
-	
-	private NoOpUnaryUdfOp() {
-		// pass null here because we override getOutputType to return type
-		// of input operator
-		super(new UserCodeClassWrapper<NoOpFunction>(NoOpFunction.class), null, "");
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public Class<? extends Key<?>>[] getKeyClasses() {
-		return (Class<? extends Key<?>>[]) new Class[0];
-	}
-
-	@Override
-	public UnaryOperatorInformation<OUT, OUT> getOperatorInfo() {
-		TypeInformation<OUT> previousOut = input.getOperatorInfo().getOutputType();
-		return new UnaryOperatorInformation<OUT, OUT>(previousOut, previousOut);
-	}
-
-	@Override
-	protected List<OUT> executeOnCollections(List<OUT> inputData, RuntimeContext runtimeContext, ExecutionConfig executionConfig) {
-		return inputData;
-	}
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/util/Utils.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/util/Utils.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/util/Utils.java
deleted file mode 100644
index d8f33a2..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/util/Utils.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.util;
-
-import java.util.Arrays;
-
-import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.common.operators.Ordering;
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.common.operators.util.FieldSet;
-import org.apache.flink.optimizer.CompilerException;
-
-
-/**
- * 
- */
-public class Utils
-{
-	public static final FieldList createOrderedFromSet(FieldSet set) {
-		if (set instanceof FieldList) {
-			return (FieldList) set;
-		} else {
-			final int[] cols = set.toArray();
-			Arrays.sort(cols);
-			return new FieldList(cols);
-		}
-	}
-	
-	public static final Ordering createOrdering(FieldList fields, boolean[] directions) {
-		final Ordering o = new Ordering();
-		for (int i = 0; i < fields.size(); i++) {
-			o.appendOrdering(fields.get(i), null, directions == null || directions[i] ? Order.ASCENDING : Order.DESCENDING);
-		}
-		return o;
-	}
-	
-	public static final Ordering createOrdering(FieldList fields) {
-		final Ordering o = new Ordering();
-		for (int i = 0; i < fields.size(); i++) {
-			o.appendOrdering(fields.get(i), null, Order.ANY);
-		}
-		return o;
-	}
-	
-	public static boolean[] getDirections(Ordering o, int numFields) {
-		final boolean[] dirs = o.getFieldSortDirections();
-		if (dirs.length == numFields) {
-			return dirs;
-		} else if (dirs.length > numFields) {
-			final boolean[] subSet = new boolean[numFields];
-			System.arraycopy(dirs, 0, subSet, 0, numFields);
-			return subSet;
-		} else {
-			throw new CompilerException();
-		}
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * No instantiation.
-	 */
-	private Utils() {}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/AdditionalOperatorsTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/AdditionalOperatorsTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/AdditionalOperatorsTest.java
deleted file mode 100644
index 1e4bafb..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/AdditionalOperatorsTest.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.record.operators.CrossOperator;
-import org.apache.flink.api.java.record.operators.CrossWithLargeOperator;
-import org.apache.flink.api.java.record.operators.CrossWithSmallOperator;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.DualInputPlanNode;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.util.DummyCrossStub;
-import org.apache.flink.optimizer.util.DummyInputFormat;
-import org.apache.flink.optimizer.util.DummyOutputFormat;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.junit.Test;
-
-/**
-* Tests that validate optimizer choices when using operators that are requesting certain specific execution
-* strategies.
-*/
-@SuppressWarnings({"serial", "deprecation"})
-public class AdditionalOperatorsTest extends CompilerTestBase {
-
-	@Test
-	public void testCrossWithSmall() {
-		// construct the plan
-		FileDataSource source1 = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source 1");
-		FileDataSource source2 = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source 2");
-		
-		CrossOperator cross = CrossWithSmallOperator.builder(new DummyCrossStub())
-				.input1(source1).input2(source2)
-				.name("Cross").build();
-	
-		FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, cross, "Sink");
-		
-		Plan plan = new Plan(sink);
-		plan.setDefaultParallelism(DEFAULT_PARALLELISM);
-		
-		
-		try {
-			OptimizedPlan oPlan = compileNoStats(plan);
-			OptimizerPlanNodeResolver resolver = new OptimizerPlanNodeResolver(oPlan);
-			
-			DualInputPlanNode crossPlanNode = resolver.getNode("Cross");
-			Channel in1 = crossPlanNode.getInput1();
-			Channel in2 = crossPlanNode.getInput2();
-			
-			assertEquals(ShipStrategyType.FORWARD, in1.getShipStrategy());
-			assertEquals(ShipStrategyType.BROADCAST, in2.getShipStrategy());
-		} catch(CompilerException ce) {
-			ce.printStackTrace();
-			fail("The pact compiler is unable to compile this plan correctly.");
-		}
-	}
-	
-	@Test
-	public void testCrossWithLarge() {
-		// construct the plan
-		FileDataSource source1 = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source 1");
-		FileDataSource source2 = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source 2");
-		
-		CrossOperator cross= CrossWithLargeOperator.builder(new DummyCrossStub())
-				.input1(source1).input2(source2)
-				.name("Cross").build();
-	
-		FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, cross, "Sink");
-		
-		Plan plan = new Plan(sink);
-		plan.setDefaultParallelism(DEFAULT_PARALLELISM);
-		
-		
-		try {
-			OptimizedPlan oPlan = compileNoStats(plan);
-			OptimizerPlanNodeResolver resolver = new OptimizerPlanNodeResolver(oPlan);
-			
-			DualInputPlanNode crossPlanNode = resolver.getNode("Cross");
-			Channel in1 = crossPlanNode.getInput1();
-			Channel in2 = crossPlanNode.getInput2();
-			
-			assertEquals(ShipStrategyType.BROADCAST, in1.getShipStrategy());
-			assertEquals(ShipStrategyType.FORWARD, in2.getShipStrategy());
-		} catch(CompilerException ce) {
-			ce.printStackTrace();
-			fail("The pact compiler is unable to compile this plan correctly.");
-		}
-	}
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
deleted file mode 100644
index 916aa27..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
+++ /dev/null
@@ -1,1039 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer;
-
-import static org.junit.Assert.*;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.flink.api.common.functions.CoGroupFunction;
-import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.optimizer.testfunctions.DummyCoGroupFunction;
-import org.apache.flink.optimizer.testfunctions.SelectOneReducer;
-import org.apache.flink.util.Collector;
-import org.junit.Assert;
-import org.junit.Test;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.operators.IterativeDataSet;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.RichJoinFunction;
-import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
-import org.apache.flink.api.java.record.operators.BulkIteration;
-import org.apache.flink.api.java.record.operators.CoGroupOperator;
-import org.apache.flink.api.java.record.operators.CrossOperator;
-import org.apache.flink.api.java.record.operators.DeltaIteration;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.JoinOperator;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.optimizer.testfunctions.DummyFlatJoinFunction;
-import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
-import org.apache.flink.optimizer.testfunctions.IdentityKeyExtractor;
-import org.apache.flink.optimizer.testfunctions.IdentityMapper;
-import org.apache.flink.optimizer.testfunctions.Top1GroupReducer;
-import org.apache.flink.optimizer.util.DummyCoGroupStub;
-import org.apache.flink.optimizer.util.DummyCrossStub;
-import org.apache.flink.optimizer.util.DummyInputFormat;
-import org.apache.flink.optimizer.util.DummyMatchStub;
-import org.apache.flink.optimizer.util.DummyNonPreservingMatchStub;
-import org.apache.flink.optimizer.util.DummyOutputFormat;
-import org.apache.flink.optimizer.util.IdentityMap;
-import org.apache.flink.optimizer.util.IdentityReduce;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-
-@SuppressWarnings({"serial", "deprecation"})
-public class BranchingPlansCompilerTest extends CompilerTestBase {
-	
-	
-	@Test
-	public void testCostComputationWithMultipleDataSinks() {
-		final int SINKS = 5;
-	
-		try {
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
-
-			DataSet<Long> source = env.generateSequence(1, 10000);
-
-			DataSet<Long> mappedA = source.map(new IdentityMapper<Long>());
-			DataSet<Long> mappedC = source.map(new IdentityMapper<Long>());
-
-			for (int sink = 0; sink < SINKS; sink++) {
-				mappedA.output(new DiscardingOutputFormat<Long>());
-				mappedC.output(new DiscardingOutputFormat<Long>());
-			}
-
-			Plan plan = env.createProgramPlan("Plans With Multiple Data Sinks");
-			OptimizedPlan oPlan = compileNoStats(plan);
-
-			new JobGraphGenerator().compileJobGraph(oPlan);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-
-	/**
-	 * 
-	 * <pre>
-	 *                (SRC A)  
-	 *                   |
-	 *                (MAP A)
-	 *             /         \   
-	 *          (MAP B)      (MAP C)
-	 *           /           /     \
-	 *        (SINK A)    (SINK B)  (SINK C)
-	 * </pre>
-	 */
-	@SuppressWarnings("unchecked")
-	@Test
-	public void testBranchingWithMultipleDataSinks2() {
-		try {
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
-
-			DataSet<Long> source = env.generateSequence(1, 10000);
-
-			DataSet<Long> mappedA = source.map(new IdentityMapper<Long>());
-			DataSet<Long> mappedB = mappedA.map(new IdentityMapper<Long>());
-			DataSet<Long> mappedC = mappedA.map(new IdentityMapper<Long>());
-
-			mappedB.output(new DiscardingOutputFormat<Long>());
-			mappedC.output(new DiscardingOutputFormat<Long>());
-			mappedC.output(new DiscardingOutputFormat<Long>());
-
-			Plan plan = env.createProgramPlan();
-			Set<Operator<?>> sinks = new HashSet<Operator<?>>(plan.getDataSinks());
-
-			OptimizedPlan oPlan = compileNoStats(plan);
-
-			// ---------- check the optimizer plan ----------
-
-			// number of sinks
-			assertEquals("Wrong number of data sinks.", 3, oPlan.getDataSinks().size());
-
-			// remove matching sinks to check relation
-			for (SinkPlanNode sink : oPlan.getDataSinks()) {
-				assertTrue(sinks.remove(sink.getProgramOperator()));
-			}
-			assertTrue(sinks.isEmpty());
-
-			new JobGraphGenerator().compileJobGraph(oPlan);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-
-	/**
-	 * <pre>
-	 *                              SINK
-	 *                               |
-	 *                            COGROUP
-	 *                        +---/    \----+
-	 *                       /               \
-	 *                      /             MATCH10
-	 *                     /               |    \
-	 *                    /                |  MATCH9
-	 *                MATCH5               |  |   \
-	 *                |   \                |  | MATCH8
-	 *                | MATCH4             |  |  |   \
-	 *                |  |   \             |  |  | MATCH7
-	 *                |  | MATCH3          |  |  |  |   \
-	 *                |  |  |   \          |  |  |  | MATCH6
-	 *                |  |  | MATCH2       |  |  |  |  |  |
-	 *                |  |  |  |   \       +--+--+--+--+--+
-	 *                |  |  |  | MATCH1            MAP 
-	 *                \  |  |  |  |  | /-----------/
-	 *                (DATA SOURCE ONE)
-	 * </pre>
-	 */
-	@Test
-	public void testBranchingSourceMultipleTimes() {
-		try {
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
-
-			DataSet<Tuple2<Long, Long>> source = env.generateSequence(1, 10000000)
-				.map(new Duplicator<Long>());
-
-			DataSet<Tuple2<Long, Long>> joined1 = source.join(source).where(0).equalTo(0)
-														.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
-
-			DataSet<Tuple2<Long, Long>> joined2 = source.join(joined1).where(0).equalTo(0)
-					.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
-
-			DataSet<Tuple2<Long, Long>> joined3 = source.join(joined2).where(0).equalTo(0)
-					.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
-
-			DataSet<Tuple2<Long, Long>> joined4 = source.join(joined3).where(0).equalTo(0)
-					.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
-
-			DataSet<Tuple2<Long, Long>> joined5 = source.join(joined4).where(0).equalTo(0)
-					.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
-
-			DataSet<Tuple2<Long, Long>> mapped = source.map(
-					new MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() {
-						@Override
-						public Tuple2<Long, Long> map(Tuple2<Long, Long> value) {
-							return null;
-						}
-			});
-
-			DataSet<Tuple2<Long, Long>> joined6 = mapped.join(mapped).where(0).equalTo(0)
-					.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
-
-			DataSet<Tuple2<Long, Long>> joined7 = mapped.join(joined6).where(0).equalTo(0)
-					.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
-
-			DataSet<Tuple2<Long, Long>> joined8 = mapped.join(joined7).where(0).equalTo(0)
-					.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
-
-			DataSet<Tuple2<Long, Long>> joined9 = mapped.join(joined8).where(0).equalTo(0)
-					.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
-
-			DataSet<Tuple2<Long, Long>> joined10 = mapped.join(joined9).where(0).equalTo(0)
-					.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
-
-
-			joined5.coGroup(joined10)
-					.where(1).equalTo(1)
-					.with(new DummyCoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>())
-
-				.output(new DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>>());
-
-			Plan plan = env.createProgramPlan();
-			OptimizedPlan oPlan = compileNoStats(plan);
-			new JobGraphGenerator().compileJobGraph(oPlan);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	/**
-	 * 
-	 * <pre>
-
-	 *              (SINK A)
-	 *                  |    (SINK B)    (SINK C)
-	 *                CROSS    /          /
-	 *               /     \   |  +------+
-	 *              /       \  | /
-	 *          REDUCE      MATCH2
-	 *             |    +---/    \
-	 *              \  /          |
-	 *               MAP          |
-	 *                |           |
-	 *             COGROUP      MATCH1
-	 *             /     \     /     \
-	 *        (SRC A)    (SRC B)    (SRC C)
-	 * </pre>
-	 */
-	@Test
-	public void testBranchingWithMultipleDataSinks() {
-		try {
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
-
-			DataSet<Tuple2<Long, Long>> sourceA = env.generateSequence(1, 10000000)
-					.map(new Duplicator<Long>());
-
-			DataSet<Tuple2<Long, Long>> sourceB = env.generateSequence(1, 10000000)
-					.map(new Duplicator<Long>());
-
-			DataSet<Tuple2<Long, Long>> sourceC = env.generateSequence(1, 10000000)
-					.map(new Duplicator<Long>());
-
-			DataSet<Tuple2<Long, Long>> mapped = sourceA.coGroup(sourceB)
-					.where(0).equalTo(1)
-					.with(new CoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>() {
-							@Override
-							public void coGroup(Iterable<Tuple2<Long, Long>> first,
-													Iterable<Tuple2<Long, Long>> second,
-													Collector<Tuple2<Long, Long>> out) {
-							  }
-					})
-					.map(new IdentityMapper<Tuple2<Long, Long>>());
-
-			DataSet<Tuple2<Long, Long>> joined = sourceB.join(sourceC)
-					.where(0).equalTo(1)
-					.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
-
-			DataSet<Tuple2<Long, Long>> joined2 = mapped.join(joined)
-					.where(1).equalTo(1)
-					.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
-
-			DataSet<Tuple2<Long, Long>> reduced = mapped
-					.groupBy(1)
-					.reduceGroup(new Top1GroupReducer<Tuple2<Long, Long>>());
-
-			reduced.cross(joined2)
-					.output(new DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>>());
-
-			joined2.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
-			joined2.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
-
-			Plan plan = env.createProgramPlan();
-			OptimizedPlan oPlan = compileNoStats(plan);
-			new JobGraphGenerator().compileJobGraph(oPlan);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@SuppressWarnings("unchecked")
-	@Test
-	public void testBranchEachContractType() {
-		try {
-			// construct the plan
-			FileDataSource sourceA = new FileDataSource(new DummyInputFormat(), "file:///test/file1", "Source A");
-			FileDataSource sourceB = new FileDataSource(new DummyInputFormat(), "file:///test/file2", "Source B");
-			FileDataSource sourceC = new FileDataSource(new DummyInputFormat(), "file:///test/file3", "Source C");
-			
-			MapOperator map1 = MapOperator.builder(new IdentityMap()).input(sourceA).name("Map 1").build();
-			
-			ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0)
-				.input(map1)
-				.name("Reduce 1")
-				.build();
-			
-			JoinOperator match1 = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0)
-				.input1(sourceB, sourceB, sourceC)
-				.input2(sourceC)
-				.name("Match 1")
-				.build();
-			;
-			CoGroupOperator cogroup1 = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 0,0)
-				.input1(sourceA)
-				.input2(sourceB)
-				.name("CoGroup 1")
-				.build();
-			
-			CrossOperator cross1 = CrossOperator.builder(new DummyCrossStub())
-				.input1(reduce1)
-				.input2(cogroup1)
-				.name("Cross 1")
-				.build();
-			
-			
-			CoGroupOperator cogroup2 = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 0,0)
-				.input1(cross1)
-				.input2(cross1)
-				.name("CoGroup 2")
-				.build();
-			
-			CoGroupOperator cogroup3 = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 0,0)
-				.input1(map1)
-				.input2(match1)
-				.name("CoGroup 3")
-				.build();
-			
-			
-			MapOperator map2 = MapOperator.builder(new IdentityMap()).input(cogroup3).name("Map 2").build();
-			
-			CoGroupOperator cogroup4 = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 0,0)
-				.input1(map2)
-				.input2(match1)
-				.name("CoGroup 4")
-				.build();
-			
-			CoGroupOperator cogroup5 = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 0,0)
-				.input1(cogroup2)
-				.input2(cogroup1)
-				.name("CoGroup 5")
-				.build();
-			
-			CoGroupOperator cogroup6 = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 0,0)
-				.input1(reduce1)
-				.input2(cogroup4)
-				.name("CoGroup 6")
-				.build();
-			
-			CoGroupOperator cogroup7 = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 0,0)
-				.input1(cogroup5)
-				.input2(cogroup6)
-				.name("CoGroup 7")
-				.build();
-			
-			FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, cogroup7);
-			sink.addInput(sourceA);
-			sink.addInput(cogroup3);
-			sink.addInput(cogroup4);
-			sink.addInput(cogroup1);
-			
-			// return the PACT plan
-			Plan plan = new Plan(sink, "Branching of each contract type");
-			
-			OptimizedPlan oPlan = compileNoStats(plan);
-			
-			JobGraphGenerator jobGen = new JobGraphGenerator();
-			
-			//Compile plan to verify that no error is thrown
-			jobGen.compileJobGraph(oPlan);
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail(e.getMessage());
-		}
-	}
-	
-
-	@Test
-	public void testBranchingUnion() {
-		try {
-			// construct the plan
-			FileDataSource source1 = new FileDataSource(new DummyInputFormat(), IN_FILE);
-			FileDataSource source2 = new FileDataSource(new DummyInputFormat(), IN_FILE);
-			
-			JoinOperator mat1 = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0)
-				.input1(source1)
-				.input2(source2)
-				.name("Match 1")
-				.build();
-			
-			MapOperator ma1 = MapOperator.builder(new IdentityMap()).input(mat1).name("Map1").build();
-			
-			ReduceOperator r1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0)
-				.input(ma1)
-				.name("Reduce 1")
-				.build();
-			
-			ReduceOperator r2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0)
-				.input(mat1)
-				.name("Reduce 2")
-				.build();
-			
-			MapOperator ma2 = MapOperator.builder(new IdentityMap()).input(mat1).name("Map 2").build();
-			
-			MapOperator ma3 = MapOperator.builder(new IdentityMap()).input(ma2).name("Map 3").build();
-			
-			@SuppressWarnings("unchecked")
-			JoinOperator mat2 = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0)
-				.input1(r1, r2, ma2, ma3)
-				.input2(ma2)
-				.name("Match 2")
-				.build();
-			mat2.setParameter(Optimizer.HINT_LOCAL_STRATEGY, Optimizer.HINT_LOCAL_STRATEGY_MERGE);
-			
-			FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, mat2);
-			
-			
-			// return the PACT plan
-			Plan plan = new Plan(sink, "Branching Union");
-			
-			OptimizedPlan oPlan = compileNoStats(plan);
-			
-			JobGraphGenerator jobGen = new JobGraphGenerator();
-			
-			//Compile plan to verify that no error is thrown
-			jobGen.compileJobGraph(oPlan);
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail(e.getMessage());
-		}
-	}
-	
-	/**
-	 * 
-	 * <pre>
-	 *             (SRC A)     
-	 *             /     \      
-	 *        (SINK A)    (SINK B)
-	 * </pre>
-	 */
-	@Test
-	public void testBranchingWithMultipleDataSinksSmall() {
-		try {
-			// construct the plan
-			final String out1Path = "file:///test/1";
-			final String out2Path = "file:///test/2";
-	
-			FileDataSource sourceA = new FileDataSource(DummyInputFormat.class, IN_FILE);
-			
-			FileDataSink sinkA = new FileDataSink(DummyOutputFormat.class, out1Path, sourceA);
-			FileDataSink sinkB = new FileDataSink(DummyOutputFormat.class, out2Path, sourceA);
-			
-			List<FileDataSink> sinks = new ArrayList<FileDataSink>();
-			sinks.add(sinkA);
-			sinks.add(sinkB);
-			
-			// return the PACT plan
-			Plan plan = new Plan(sinks, "Plans With Multiple Data Sinks");
-			
-			OptimizedPlan oPlan = compileNoStats(plan);
-			
-			// ---------- check the optimizer plan ----------
-			
-			// number of sinks
-			Assert.assertEquals("Wrong number of data sinks.", 2, oPlan.getDataSinks().size());
-			
-			// sinks contain all sink paths
-			Set<String> allSinks = new HashSet<String>();
-			allSinks.add(out1Path);
-			allSinks.add(out2Path);
-			
-			for (SinkPlanNode n : oPlan.getDataSinks()) {
-				String path = ((FileDataSink) n.getSinkNode().getOperator()).getFilePath();
-				Assert.assertTrue("Invalid data sink.", allSinks.remove(path));
-			}
-			
-			// ---------- compile plan to nephele job graph to verify that no error is thrown ----------
-			
-			JobGraphGenerator jobGen = new JobGraphGenerator();
-			jobGen.compileJobGraph(oPlan);
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail(e.getMessage());
-		}
-	}
-	
-	/**
-	 * 
-	 * <pre>
-	 *     (SINK 3) (SINK 1)   (SINK 2) (SINK 4)
-	 *         \     /             \     /
-	 *         (SRC A)             (SRC B)
-	 * </pre>
-	 * 
-	 * NOTE: this case is currently not caught by the compiler. we should enable the test once it is caught.
-	 */
-	@Test
-	public void testBranchingDisjointPlan() {
-		// construct the plan
-		final String out1Path = "file:///test/1";
-		final String out2Path = "file:///test/2";
-		final String out3Path = "file:///test/3";
-		final String out4Path = "file:///test/4";
-
-		FileDataSource sourceA = new FileDataSource(DummyInputFormat.class, IN_FILE);
-		FileDataSource sourceB = new FileDataSource(DummyInputFormat.class, IN_FILE);
-		
-		FileDataSink sink1 = new FileDataSink(DummyOutputFormat.class, out1Path, sourceA, "1");
-		FileDataSink sink2 = new FileDataSink(DummyOutputFormat.class, out2Path, sourceB, "2");
-		FileDataSink sink3 = new FileDataSink(DummyOutputFormat.class, out3Path, sourceA, "3");
-		FileDataSink sink4 = new FileDataSink(DummyOutputFormat.class, out4Path, sourceB, "4");
-		
-		
-		List<FileDataSink> sinks = new ArrayList<FileDataSink>();
-		sinks.add(sink1);
-		sinks.add(sink2);
-		sinks.add(sink3);
-		sinks.add(sink4);
-		
-		// return the PACT plan
-		Plan plan = new Plan(sinks, "Disjoint plan with multiple data sinks and branches");
-		compileNoStats(plan);
-	}
-	
-	@Test
-	public void testBranchAfterIteration() {
-		FileDataSource sourceA = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 2");
-		
-		BulkIteration iteration = new BulkIteration("Loop");
-		iteration.setInput(sourceA);
-		iteration.setMaximumNumberOfIterations(10);
-		
-		MapOperator mapper = MapOperator.builder(IdentityMap.class).name("Mapper").input(iteration.getPartialSolution()).build();
-		iteration.setNextPartialSolution(mapper);
-		
-		FileDataSink sink1 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, iteration, "Sink 1");
-		
-		MapOperator postMap = MapOperator.builder(IdentityMap.class).name("Post Iteration Mapper")
-				.input(iteration).build();
-		
-		FileDataSink sink2 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, postMap, "Sink 2");
-		
-		List<FileDataSink> sinks = new ArrayList<FileDataSink>();
-		sinks.add(sink1);
-		sinks.add(sink2);
-		
-		Plan plan = new Plan(sinks);
-		
-		try {
-			compileNoStats(plan);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testBranchBeforeIteration() {
-		FileDataSource source1 = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 1");
-		FileDataSource source2 = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 2");
-		
-		BulkIteration iteration = new BulkIteration("Loop");
-		iteration.setInput(source2);
-		iteration.setMaximumNumberOfIterations(10);
-		
-		MapOperator inMap = MapOperator.builder(new IdentityMap())
-				                       .input(source1)
-				                       .name("In Iteration Map")
-				                       .setBroadcastVariable("BC", iteration.getPartialSolution())
-				                       .build();
-		
-		iteration.setNextPartialSolution(inMap);
-		
-		MapOperator postMap = MapOperator.builder(new IdentityMap())
-										 .input(source1)
-										 .name("Post Iteration Map")
-										 .setBroadcastVariable("BC", iteration)
-										 .build();
-		
-		FileDataSink sink = new FileDataSink(DummyOutputFormat.class, OUT_FILE, postMap, "Sink");
-		
-		Plan plan = new Plan(sink);
-		
-		try {
-			compileNoStats(plan);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail(e.getMessage());
-		}
-	}
-
-	/**
-	 * Test to ensure that sourceA is inside as well as outside of the iteration the same
-	 * node.
-	 *
-	 * <pre>
-	 *       (SRC A)               (SRC B)
-	 *      /       \             /       \
-	 *  (SINK 1)   (ITERATION)    |     (SINK 2)
-	 *             /        \     /
-	 *         (SINK 3)     (CROSS => NEXT PARTIAL SOLUTION)
-	 * </pre>
-	 */
-	@Test
-	public void testClosure() {
-		FileDataSource sourceA = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 1");
-		FileDataSource sourceB = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 2");
-
-		FileDataSink sink1 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, sourceA, "Sink 1");
-		FileDataSink sink2 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, sourceB, "Sink 2");
-
-		BulkIteration iteration = new BulkIteration("Loop");
-		iteration.setInput(sourceA);
-		iteration.setMaximumNumberOfIterations(10);
-
-		CrossOperator stepFunction = CrossOperator.builder(DummyCrossStub.class).name("StepFunction").
-				input1(iteration.getPartialSolution()).
-				input2(sourceB).
-				build();
-
-		iteration.setNextPartialSolution(stepFunction);
-
-		FileDataSink sink3 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, iteration, "Sink 3");
-
-		List<FileDataSink> sinks = new ArrayList<FileDataSink>();
-		sinks.add(sink1);
-		sinks.add(sink2);
-		sinks.add(sink3);
-
-		Plan plan = new Plan(sinks);
-
-		try{
-			compileNoStats(plan);
-		}catch(Exception e){
-			e.printStackTrace();
-			Assert.fail(e.getMessage());
-		}
-	}
-
-	/**
-	 * <pre>
-	 *       (SRC A)         (SRC B)          (SRC C)
-	 *      /       \       /                /       \
-	 *  (SINK 1) (DELTA ITERATION)          |     (SINK 2)
-	 *             /    |   \               /
-	 *         (SINK 3) |   (CROSS => NEXT WORKSET)
-	 *                  |             |
-	 *                (JOIN => SOLUTION SET DELTA)
-	 * </pre>
-	 */
-	@Test
-	public void testClosureDeltaIteration() {
-		FileDataSource sourceA = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 1");
-		FileDataSource sourceB = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 2");
-		FileDataSource sourceC = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 3");
-
-		FileDataSink sink1 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, sourceA, "Sink 1");
-		FileDataSink sink2 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, sourceC, "Sink 2");
-
-		DeltaIteration iteration = new DeltaIteration(0, "Loop");
-		iteration.setInitialSolutionSet(sourceA);
-		iteration.setInitialWorkset(sourceB);
-		iteration.setMaximumNumberOfIterations(10);
-
-		CrossOperator nextWorkset = CrossOperator.builder(DummyCrossStub.class).name("Next workset").
-				input1(iteration.getWorkset()).
-				input2(sourceC).
-				build();
-
-		JoinOperator solutionSetDelta = JoinOperator.builder(DummyMatchStub.class, LongValue.class,0,0).
-				name("Next solution set.").
-				input1(nextWorkset).
-				input2(iteration.getSolutionSet()).
-				build();
-
-		iteration.setNextWorkset(nextWorkset);
-		iteration.setSolutionSetDelta(solutionSetDelta);
-
-		FileDataSink sink3 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, iteration, "Sink 3");
-
-		List<FileDataSink> sinks = new ArrayList<FileDataSink>();
-		sinks.add(sink1);
-		sinks.add(sink2);
-		sinks.add(sink3);
-
-		Plan plan = new Plan(sinks);
-
-		try{
-			compileNoStats(plan);
-		}catch(Exception e){
-			e.printStackTrace();
-			Assert.fail(e.getMessage());
-		}
-	}
-
-	/**
-	 * <pre>
-	 *                  +----Iteration-------+
-	 *                  |                    |
-	 *       /---------< >---------join-----< >---sink
-	 *      / (Solution)|           /        |
-	 *     /            |          /         |
-	 *    /--map-------< >----\   /       /--|
-	 *   /     (Workset)|      \ /       /   |
-	 * src-map          |     join------/    |
-	 *   \              |      /             |
-	 *    \             +-----/--------------+
-	 *     \                 /
-	 *      \--reduce-------/
-	 * </pre>
-	 */
-	@Test
-	public void testDeltaIterationWithStaticInput() {
-		FileDataSource source = new FileDataSource(DummyInputFormat.class, IN_FILE, "source");
-
-		MapOperator mappedSource = MapOperator.builder(IdentityMap.class).
-				input(source).
-				name("Identity mapped source").
-				build();
-
-		ReduceOperator reducedSource = ReduceOperator.builder(IdentityReduce.class).
-				input(source).
-				name("Identity reduce source").
-				build();
-
-		DeltaIteration iteration = new DeltaIteration(0,"Loop");
-		iteration.setMaximumNumberOfIterations(10);
-		iteration.setInitialSolutionSet(source);
-		iteration.setInitialWorkset(mappedSource);
-
-		JoinOperator nextWorkset = JoinOperator.builder(DummyNonPreservingMatchStub.class, IntValue.class, 0,0).
-				input1(iteration.getWorkset()).
-				input2(reducedSource).
-				name("Next work set").
-				build();
-
-		JoinOperator solutionSetDelta = JoinOperator.builder(DummyNonPreservingMatchStub.class, IntValue.class, 0,
-				0).
-				input1(iteration.getSolutionSet()).
-				input2(nextWorkset).
-				name("Solution set delta").
-				build();
-
-		iteration.setNextWorkset(nextWorkset);
-		iteration.setSolutionSetDelta(solutionSetDelta);
-
-		FileDataSink sink = new FileDataSink(DummyOutputFormat.class, OUT_FILE, iteration, "Iteration sink");
-		List<FileDataSink> sinks = new ArrayList<FileDataSink>();
-		sinks.add(sink);
-
-		Plan plan = new Plan(sinks);
-
-		try{
-			compileNoStats(plan);
-		}catch(Exception e){
-			e.printStackTrace();
-			Assert.fail(e.getMessage());
-		}
-	}
-
-	/**
-	 * <pre>
-	 *             +---------Iteration-------+
-	 *             |                         |
-	 *    /--map--< >----\                   |
-	 *   /         |      \         /-------< >---sink
-	 * src-map     |     join------/         |
-	 *   \         |      /                  |
-	 *    \        +-----/-------------------+
-	 *     \            /
-	 *      \--reduce--/
-	 * </pre>
-	 */
-	@Test
-	public void testIterationWithStaticInput() {
-		try {
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(100);
-
-			DataSet<Long> source = env.generateSequence(1, 1000000);
-
-			DataSet<Long> mapped = source.map(new IdentityMapper<Long>());
-
-			DataSet<Long> reduced = source.groupBy(new IdentityKeyExtractor<Long>()).reduce(new SelectOneReducer<Long>());
-
-			IterativeDataSet<Long> iteration = mapped.iterate(10);
-			iteration.closeWith(
-					iteration.join(reduced)
-							.where(new IdentityKeyExtractor<Long>())
-							.equalTo(new IdentityKeyExtractor<Long>())
-							.with(new DummyFlatJoinFunction<Long>()))
-					.output(new DiscardingOutputFormat<Long>());
-
-			compileNoStats(env.createProgramPlan());
-		}
-		catch(Exception e){
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testBranchingBroadcastVariable() {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.setDegreeOfParallelism(100);
-
-		DataSet<String> input1 = env.readTextFile(IN_FILE).name("source1");
-		DataSet<String> input2 = env.readTextFile(IN_FILE).name("source2");
-		DataSet<String> input3 = env.readTextFile(IN_FILE).name("source3");
-		
-		DataSet<String> result1 = input1
-				.map(new IdentityMapper<String>())
-				.reduceGroup(new Top1GroupReducer<String>())
-					.withBroadcastSet(input3, "bc");
-		
-		DataSet<String> result2 = input2
-				.map(new IdentityMapper<String>())
-				.reduceGroup(new Top1GroupReducer<String>())
-					.withBroadcastSet(input3, "bc");
-		
-		result1.join(result2)
-				.where(new IdentityKeyExtractor<String>())
-				.equalTo(new IdentityKeyExtractor<String>())
-				.with(new RichJoinFunction<String, String, String>() {
-					@Override
-					public String join(String first, String second) {
-						return null;
-					}
-				})
-				.withBroadcastSet(input3, "bc1")
-				.withBroadcastSet(input1, "bc2")
-				.withBroadcastSet(result1, "bc3")
-			.print();
-		
-		Plan plan = env.createProgramPlan();
-		
-		try{
-			compileNoStats(plan);
-		}catch(Exception e){
-			e.printStackTrace();
-			Assert.fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testBCVariableClosure() {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
-		DataSet<String> input = env.readTextFile(IN_FILE).name("source1");
-		
-		DataSet<String> reduced = input
-				.map(new IdentityMapper<String>())
-				.reduceGroup(new Top1GroupReducer<String>());
-		
-		
-		DataSet<String> initialSolution = input.map(new IdentityMapper<String>()).withBroadcastSet(reduced, "bc");
-		
-		
-		IterativeDataSet<String> iteration = initialSolution.iterate(100);
-		
-		iteration.closeWith(iteration.map(new IdentityMapper<String>()).withBroadcastSet(reduced, "red"))
-				.print();
-		
-		Plan plan = env.createProgramPlan();
-		
-		try{
-			compileNoStats(plan);
-		}catch(Exception e){
-			e.printStackTrace();
-			Assert.fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testMultipleIterations() {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.setDegreeOfParallelism(100);
-		
-		DataSet<String> input = env.readTextFile(IN_FILE).name("source1");
-		
-		DataSet<String> reduced = input
-				.map(new IdentityMapper<String>())
-				.reduceGroup(new Top1GroupReducer<String>());
-			
-		IterativeDataSet<String> iteration1 = input.iterate(100);
-		IterativeDataSet<String> iteration2 = input.iterate(20);
-		IterativeDataSet<String> iteration3 = input.iterate(17);
-		
-		iteration1.closeWith(iteration1.map(new IdentityMapper<String>()).withBroadcastSet(reduced, "bc1")).print();
-		iteration2.closeWith(iteration2.reduceGroup(new Top1GroupReducer<String>()).withBroadcastSet(reduced, "bc2")).print();
-		iteration3.closeWith(iteration3.reduceGroup(new IdentityGroupReducer<String>()).withBroadcastSet(reduced, "bc3")).print();
-		
-		Plan plan = env.createProgramPlan();
-		
-		try{
-			compileNoStats(plan);
-		}catch(Exception e){
-			e.printStackTrace();
-			Assert.fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testMultipleIterationsWithClosueBCVars() {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.setDegreeOfParallelism(100);
-
-		DataSet<String> input = env.readTextFile(IN_FILE).name("source1");
-			
-		IterativeDataSet<String> iteration1 = input.iterate(100);
-		IterativeDataSet<String> iteration2 = input.iterate(20);
-		IterativeDataSet<String> iteration3 = input.iterate(17);
-		
-		
-		iteration1.closeWith(iteration1.map(new IdentityMapper<String>())).print();
-		iteration2.closeWith(iteration2.reduceGroup(new Top1GroupReducer<String>())).print();
-		iteration3.closeWith(iteration3.reduceGroup(new IdentityGroupReducer<String>())).print();
-		
-		Plan plan = env.createProgramPlan();
-		
-		try{
-			compileNoStats(plan);
-		}catch(Exception e){
-			e.printStackTrace();
-			Assert.fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testBranchesOnlyInBCVariables1() {
-		try{
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(100);
-
-			DataSet<Long> input = env.generateSequence(1, 10);
-			DataSet<Long> bc_input = env.generateSequence(1, 10);
-			
-			input
-				.map(new IdentityMapper<Long>()).withBroadcastSet(bc_input, "name1")
-				.map(new IdentityMapper<Long>()).withBroadcastSet(bc_input, "name2")
-				.print();
-			
-			Plan plan = env.createProgramPlan();
-			compileNoStats(plan);
-		}
-		catch(Exception e){
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testBranchesOnlyInBCVariables2() {
-		try{
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(100);
-
-			DataSet<Tuple2<Long, Long>> input = env.generateSequence(1, 10).map(new Duplicator<Long>()).name("proper input");
-			
-			DataSet<Long> bc_input1 = env.generateSequence(1, 10).name("BC input 1");
-			DataSet<Long> bc_input2 = env.generateSequence(1, 10).name("BC input 1");
-			
-			DataSet<Tuple2<Long, Long>> joinInput1 =
-					input.map(new IdentityMapper<Tuple2<Long,Long>>())
-						.withBroadcastSet(bc_input1.map(new IdentityMapper<Long>()), "bc1")
-						.withBroadcastSet(bc_input2, "bc2");
-			
-			DataSet<Tuple2<Long, Long>> joinInput2 =
-					input.map(new IdentityMapper<Tuple2<Long,Long>>())
-						.withBroadcastSet(bc_input1, "bc1")
-						.withBroadcastSet(bc_input2, "bc2");
-			
-			DataSet<Tuple2<Long, Long>> joinResult = joinInput1
-				.join(joinInput2, JoinHint.REPARTITION_HASH_FIRST).where(0).equalTo(1)
-				.with(new DummyFlatJoinFunction<Tuple2<Long,Long>>());
-			
-			input
-				.map(new IdentityMapper<Tuple2<Long,Long>>())
-					.withBroadcastSet(bc_input1, "bc1")
-				.union(joinResult)
-				.print();
-			
-			Plan plan = env.createProgramPlan();
-			compileNoStats(plan);
-		}
-		catch(Exception e){
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	private static final class Duplicator<T> implements MapFunction<T, Tuple2<T, T>> {
-		
-		@Override
-		public Tuple2<T, T> map(T value) {
-			return new Tuple2<T, T>(value, value);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/BroadcastVariablePipelinebreakerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/BroadcastVariablePipelinebreakerTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/BroadcastVariablePipelinebreakerTest.java
deleted file mode 100644
index c7ad2da..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/BroadcastVariablePipelinebreakerTest.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.common.Plan;
-import org.junit.Test;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.optimizer.dag.TempMode;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.testfunctions.IdentityMapper;
-
-@SuppressWarnings("serial")
-public class BroadcastVariablePipelinebreakerTest extends CompilerTestBase {
-
-	@Test
-	public void testNoBreakerForIndependentVariable() {
-		try {
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			
-			DataSet<String> source1 = env.fromElements("test");
-			DataSet<String> source2 = env.fromElements("test");
-			
-			source1.map(new IdentityMapper<String>()).withBroadcastSet(source2, "some name").print();
-			
-			Plan p = env.createProgramPlan();
-			OptimizedPlan op = compileNoStats(p);
-			
-			SinkPlanNode sink = op.getDataSinks().iterator().next();
-			SingleInputPlanNode mapper = (SingleInputPlanNode) sink.getInput().getSource();
-			
-			assertEquals(TempMode.NONE, mapper.getInput().getTempMode());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	 @Test
-	public void testBreakerForDependentVariable() {
-			try {
-				ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<String> source1 = env.fromElements("test");
-				
-				source1.map(new IdentityMapper<String>()).map(new IdentityMapper<String>()).withBroadcastSet(source1, "some name").print();
-				
-				Plan p = env.createProgramPlan();
-				OptimizedPlan op = compileNoStats(p);
-				
-				SinkPlanNode sink = op.getDataSinks().iterator().next();
-				SingleInputPlanNode mapper = (SingleInputPlanNode) sink.getInput().getSource();
-				
-				assertEquals(TempMode.PIPELINE_BREAKER, mapper.getInput().getTempMode());
-			}
-			catch (Exception e) {
-				e.printStackTrace();
-				fail(e.getMessage());
-			}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java
deleted file mode 100644
index 3e7da6c..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java
+++ /dev/null
@@ -1,268 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import org.junit.Test;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.operators.GenericDataSourceBase;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.operators.IterativeDataSet;
-import org.apache.flink.api.common.functions.RichJoinFunction;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.optimizer.dag.TempMode;
-import org.apache.flink.optimizer.plan.DualInputPlanNode;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.operators.DriverStrategy;
-
-/**
-* Tests that validate optimizer choice when using hash joins inside of iterations
-*/
-@SuppressWarnings("serial")
-public class CachedMatchStrategyCompilerTest extends CompilerTestBase {
-
-	/**
-	 * This tests whether a HYBRIDHASH_BUILD_SECOND is correctly transformed to a HYBRIDHASH_BUILD_SECOND_CACHED
-	 * when inside of an iteration an on the static path
-	 */
-	@Test
-	public void testRightSide() {
-		try {
-			
-			Plan plan = getTestPlanRightStatic(Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND);
-			
-			OptimizedPlan oPlan = compileNoStats(plan);
-	
-			OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(oPlan);
-			DualInputPlanNode innerJoin = resolver.getNode("DummyJoiner");
-			
-			// verify correct join strategy
-			assertEquals(DriverStrategy.HYBRIDHASH_BUILD_SECOND_CACHED, innerJoin.getDriverStrategy()); 
-			assertEquals(TempMode.NONE, innerJoin.getInput1().getTempMode());
-			assertEquals(TempMode.NONE, innerJoin.getInput2().getTempMode());
-		
-			new JobGraphGenerator().compileJobGraph(oPlan);
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			fail("Test errored: " + e.getMessage());
-		}
-	}
-	
-	/**
-	 * This test makes sure that only a HYBRIDHASH on the static path is transformed to the cached variant
-	 */
-	@Test
-	public void testRightSideCountercheck() {
-		try {
-			
-			Plan plan = getTestPlanRightStatic(Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_FIRST);
-			
-			OptimizedPlan oPlan = compileNoStats(plan);
-	
-			OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(oPlan);
-			DualInputPlanNode innerJoin = resolver.getNode("DummyJoiner");
-			
-			// verify correct join strategy
-			assertEquals(DriverStrategy.HYBRIDHASH_BUILD_FIRST, innerJoin.getDriverStrategy()); 
-			assertEquals(TempMode.NONE, innerJoin.getInput1().getTempMode());
-			assertEquals(TempMode.CACHED, innerJoin.getInput2().getTempMode());
-		
-			new JobGraphGenerator().compileJobGraph(oPlan);
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			fail("Test errored: " + e.getMessage());
-		}
-	}
-	
-	/**
-	 * This tests whether a HYBRIDHASH_BUILD_FIRST is correctly transformed to a HYBRIDHASH_BUILD_FIRST_CACHED
-	 * when inside of an iteration an on the static path
-	 */
-	@Test
-	public void testLeftSide() {
-		try {
-			
-			Plan plan = getTestPlanLeftStatic(Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_FIRST);
-			
-			OptimizedPlan oPlan = compileNoStats(plan);
-	
-			OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(oPlan);
-			DualInputPlanNode innerJoin = resolver.getNode("DummyJoiner");
-			
-			// verify correct join strategy
-			assertEquals(DriverStrategy.HYBRIDHASH_BUILD_FIRST_CACHED, innerJoin.getDriverStrategy());
-			assertEquals(TempMode.NONE, innerJoin.getInput1().getTempMode());
-			assertEquals(TempMode.NONE, innerJoin.getInput2().getTempMode());
-		
-			new JobGraphGenerator().compileJobGraph(oPlan);
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			fail("Test errored: " + e.getMessage());
-		}
-	}
-	
-	/**
-	 * This test makes sure that only a HYBRIDHASH on the static path is transformed to the cached variant
-	 */
-	@Test
-	public void testLeftSideCountercheck() {
-		try {
-			
-			Plan plan = getTestPlanLeftStatic(Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND);
-			
-			OptimizedPlan oPlan = compileNoStats(plan);
-	
-			OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(oPlan);
-			DualInputPlanNode innerJoin = resolver.getNode("DummyJoiner");
-			
-			// verify correct join strategy
-			assertEquals(DriverStrategy.HYBRIDHASH_BUILD_SECOND, innerJoin.getDriverStrategy());
-			assertEquals(TempMode.CACHED, innerJoin.getInput1().getTempMode());
-			assertEquals(TempMode.NONE, innerJoin.getInput2().getTempMode());
-		
-			new JobGraphGenerator().compileJobGraph(oPlan);
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			fail("Test errored: " + e.getMessage());
-		}
-	}
-	
-	/**
-	 * This test simulates a join of a big left side with a small right side inside of an iteration, where the small side is on a static path.
-	 * Currently the best execution plan is a HYBRIDHASH_BUILD_SECOND_CACHED, where the small side is hashed and cached.
-	 * This test also makes sure that all relevant plans are correctly enumerated by the optimizer.
-	 */
-	@Test
-	public void testCorrectChoosing() {
-		try {
-			
-			Plan plan = getTestPlanRightStatic("");
-			
-			SourceCollectorVisitor sourceCollector = new SourceCollectorVisitor();
-			plan.accept(sourceCollector);
-			
-			for(GenericDataSourceBase<?, ?> s : sourceCollector.getSources()) {
-				if(s.getName().equals("bigFile")) {
-					this.setSourceStatistics(s, 10000000, 1000);
-				}
-				else if(s.getName().equals("smallFile")) {
-					this.setSourceStatistics(s, 100, 100);
-				}
-			}
-			
-			
-			OptimizedPlan oPlan = compileNoStats(plan);
-	
-			OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(oPlan);
-			DualInputPlanNode innerJoin = resolver.getNode("DummyJoiner");
-			
-			// verify correct join strategy
-			assertEquals(DriverStrategy.HYBRIDHASH_BUILD_SECOND_CACHED, innerJoin.getDriverStrategy());
-			assertEquals(TempMode.NONE, innerJoin.getInput1().getTempMode());
-			assertEquals(TempMode.NONE, innerJoin.getInput2().getTempMode());
-		
-			new JobGraphGenerator().compileJobGraph(oPlan);
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			fail("Test errored: " + e.getMessage());
-		}
-	}
-	
-	private Plan getTestPlanRightStatic(String strategy) {
-		
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
-		
-		DataSet<Tuple3<Long, Long, Long>> bigInput = env.readCsvFile("file://bigFile").types(Long.class, Long.class, Long.class).name("bigFile");
-		
-		DataSet<Tuple3<Long, Long, Long>> smallInput = env.readCsvFile("file://smallFile").types(Long.class, Long.class, Long.class).name("smallFile");
-		
-		IterativeDataSet<Tuple3<Long, Long, Long>> iteration = bigInput.iterate(10);
-		
-		Configuration joinStrategy = new Configuration();
-		joinStrategy.setString(Optimizer.HINT_SHIP_STRATEGY, Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH);
-		
-		if(strategy != "") {
-			joinStrategy.setString(Optimizer.HINT_LOCAL_STRATEGY, strategy);
-		}
-		
-		DataSet<Tuple3<Long, Long, Long>> inner = iteration.join(smallInput).where(0).equalTo(0).with(new DummyJoiner()).name("DummyJoiner").withParameters(joinStrategy);
-
-		DataSet<Tuple3<Long, Long, Long>> output = iteration.closeWith(inner);
-		
-		output.print();
-		
-		return env.createProgramPlan();
-		
-	}
-	
-	private Plan getTestPlanLeftStatic(String strategy) {
-		
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
-		
-		@SuppressWarnings("unchecked")
-		DataSet<Tuple3<Long, Long, Long>> bigInput = env.fromElements(new Tuple3<Long, Long, Long>(1L, 2L, 3L),
-				new Tuple3<Long, Long, Long>(1L, 2L, 3L),new Tuple3<Long, Long, Long>(1L, 2L, 3L)).name("Big");
-		
-		@SuppressWarnings("unchecked")
-		DataSet<Tuple3<Long, Long, Long>> smallInput = env.fromElements(new Tuple3<Long, Long, Long>(1L, 2L, 3L)).name("Small");
-		
-		IterativeDataSet<Tuple3<Long, Long, Long>> iteration = bigInput.iterate(10);
-		
-		Configuration joinStrategy = new Configuration();
-		joinStrategy.setString(Optimizer.HINT_LOCAL_STRATEGY, strategy);
-		
-		DataSet<Tuple3<Long, Long, Long>> inner = smallInput.join(iteration).where(0).equalTo(0).with(new DummyJoiner()).name("DummyJoiner").withParameters(joinStrategy);
-
-		DataSet<Tuple3<Long, Long, Long>> output = iteration.closeWith(inner);
-		
-		output.print();
-		
-		return env.createProgramPlan();
-		
-	}
-	
-	private static class DummyJoiner extends RichJoinFunction<Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>> {
-
-		@Override
-		public Tuple3<Long, Long, Long> join(Tuple3<Long, Long, Long> first,
-				Tuple3<Long, Long, Long> second) throws Exception {
-
-			return first;
-		}
-	}
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/CoGroupSolutionSetFirstTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/CoGroupSolutionSetFirstTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/CoGroupSolutionSetFirstTest.java
deleted file mode 100644
index eba07f1..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/CoGroupSolutionSetFirstTest.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer;
-
-import org.apache.flink.api.common.functions.RichCoGroupFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.junit.Assert;
-import org.junit.Test;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.operators.DeltaIteration;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.DualInputPlanNode;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.PlanNode;
-import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.Visitor;
-
-@SuppressWarnings("serial")
-public class CoGroupSolutionSetFirstTest extends CompilerTestBase {
-	
-	public static class SimpleCGroup extends RichCoGroupFunction<Tuple1<Integer>, Tuple1<Integer>, Tuple1<Integer>> {
-		@Override
-		public void coGroup(Iterable<Tuple1<Integer>> first, Iterable<Tuple1<Integer>> second, Collector<Tuple1<Integer>> out) {}
-	}
-
-	public static class SimpleMap extends RichMapFunction<Tuple1<Integer>, Tuple1<Integer>> {
-		@Override
-		public Tuple1<Integer> map(Tuple1<Integer> value) throws Exception {
-			return null;
-		}
-	}
-
-	@Test
-	public void testCoGroupSolutionSet() {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		DataSet<Tuple1<Integer>> raw = env.readCsvFile(IN_FILE).types(Integer.class);
-
-		DeltaIteration<Tuple1<Integer>, Tuple1<Integer>> iteration = raw.iterateDelta(raw, 1000, 0);
-
-		DataSet<Tuple1<Integer>> test = iteration.getWorkset().map(new SimpleMap());
-		DataSet<Tuple1<Integer>> delta = iteration.getSolutionSet().coGroup(test).where(0).equalTo(0).with(new SimpleCGroup());
-		DataSet<Tuple1<Integer>> feedback = iteration.getWorkset().map(new SimpleMap());
-		DataSet<Tuple1<Integer>> result = iteration.closeWith(delta, feedback);
-
-		result.print();
-
-		Plan plan = env.createProgramPlan();
-		OptimizedPlan oPlan = null;
-		try {
-			oPlan = compileNoStats(plan);
-		} catch(CompilerException e) {
-			Assert.fail(e.getMessage());
-		}
-
-		oPlan.accept(new Visitor<PlanNode>() {
-			@Override
-			public boolean preVisit(PlanNode visitable) {
-				if (visitable instanceof WorksetIterationPlanNode) {
-					PlanNode deltaNode = ((WorksetIterationPlanNode) visitable).getSolutionSetDeltaPlanNode();
-
-					//get the CoGroup
-					DualInputPlanNode dpn = (DualInputPlanNode) deltaNode.getInputs().iterator().next().getSource();
-					Channel in1 = dpn.getInput1();
-					Channel in2 = dpn.getInput2();
-
-					Assert.assertTrue(in1.getLocalProperties().getOrdering() == null);
-					Assert.assertTrue(in2.getLocalProperties().getOrdering() != null);
-					Assert.assertTrue(in2.getLocalProperties().getOrdering().getInvolvedIndexes().contains(0));
-					Assert.assertTrue(in1.getShipStrategy() == ShipStrategyType.FORWARD);
-					Assert.assertTrue(in2.getShipStrategy() == ShipStrategyType.PARTITION_HASH);
-					return false;
-				}
-				return true;
-			}
-
-			@Override
-			public void postVisit(PlanNode visitable) {}
-		});
-	}
-}