You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/20 11:07:15 UTC

[36/53] [abbrv] flink git commit: [optimizer] Rename optimizer project to "flink-optimizer" (previously flink-compiler)

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/TempMode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/TempMode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/TempMode.java
deleted file mode 100644
index 0d1dfc9..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/TempMode.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dag;
-
-/**
- * Enumeration to indicate the mode of temporarily materializing the data that flows across a connection.
- * Introducing such an artificial dam is sometimes necessary to avoid that a certain data flows deadlock
- * themselves, or as a cache to replay an intermediate result.
- */
-public enum TempMode {
-	
-	NONE(false, false),
-	PIPELINE_BREAKER(false, true),
-	CACHED(true, false),
-	CACHING_PIPELINE_BREAKER(true, true);
-	
-	// --------------------------------------------------------------------------------------------
-	
-	private final boolean cached;
-	
-	private final boolean breaksPipeline;
-	
-	
-	private TempMode(boolean cached, boolean breaksPipeline) {
-		this.cached = cached;
-		this.breaksPipeline = breaksPipeline;
-	}
-
-	public boolean isCached() {
-		return cached;
-	}
-
-	public boolean breaksPipeline() {
-		return breaksPipeline;
-	}
-	
-	public TempMode makePipelineBreaker() {
-		if (this == NONE) {
-			return PIPELINE_BREAKER;
-		} else if (this == CACHED) {
-			return CACHING_PIPELINE_BREAKER;
-		} else {
-			return this;
-		}
-	}
-	
-	public TempMode makeCached() {
-		if (this == NONE) {
-			return CACHED;
-		} else if (this == PIPELINE_BREAKER) {
-			return CACHING_PIPELINE_BREAKER;
-		} else {
-			return this;
-		}
-	}
-	
-	
-	public TempMode makeNonCached() {
-		if (this == CACHED) {
-			return NONE;
-		} else if (this == CACHING_PIPELINE_BREAKER) {
-			return PIPELINE_BREAKER;
-		} else {
-			return this;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java
deleted file mode 100644
index 39da165..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java
+++ /dev/null
@@ -1,747 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.dag;
-
-import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE;
-import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE_AND_DAM;
-import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.NOT_FOUND;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.flink.api.common.ExecutionMode;
-import org.apache.flink.api.common.operators.DualInputOperator;
-import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.common.operators.SemanticProperties;
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.Optimizer;
-import org.apache.flink.optimizer.costs.CostEstimator;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.InterestingProperties;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
-import org.apache.flink.optimizer.operators.OperatorDescriptorDual;
-import org.apache.flink.optimizer.operators.OperatorDescriptorDual.GlobalPropertiesPair;
-import org.apache.flink.optimizer.operators.OperatorDescriptorDual.LocalPropertiesPair;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.DualInputPlanNode;
-import org.apache.flink.optimizer.plan.NamedChannel;
-import org.apache.flink.optimizer.plan.PlanNode;
-import org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.io.network.DataExchangeMode;
-import org.apache.flink.runtime.operators.DamBehavior;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.util.Visitor;
-
-import com.google.common.collect.Sets;
-
-/**
- * A node in the optimizer plan that represents a PACT with a two different inputs, such as MATCH or CROSS.
- * The two inputs are not substitutable in their sides.
- */
-public abstract class TwoInputNode extends OptimizerNode {
-	
-	protected final FieldList keys1; // The set of key fields for the first input
-	
-	protected final FieldList keys2; // The set of key fields for the second input
-	
-	protected DagConnection input1; // The first input edge
-
-	protected DagConnection input2; // The second input edge
-	
-	private List<OperatorDescriptorDual> cachedDescriptors;
-	
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * Creates a new node with a single input for the optimizer plan.
-	 * 
-	 * @param pactContract
-	 *        The PACT that the node represents.
-	 */
-	public TwoInputNode(DualInputOperator<?, ?, ?, ?> pactContract) {
-		super(pactContract);
-
-		int[] k1 = pactContract.getKeyColumns(0);
-		int[] k2 = pactContract.getKeyColumns(1);
-		
-		this.keys1 = k1 == null || k1.length == 0 ? null : new FieldList(k1);
-		this.keys2 = k2 == null || k2.length == 0 ? null : new FieldList(k2);
-		
-		if (this.keys1 != null) {
-			if (this.keys2 != null) {
-				if (this.keys1.size() != this.keys2.size()) {
-					throw new CompilerException("Unequal number of key fields on the two inputs.");
-				}
-			} else {
-				throw new CompilerException("Keys are set on first input, but not on second.");
-			}
-		} else if (this.keys2 != null) {
-			throw new CompilerException("Keys are set on second input, but not on first.");
-		}
-	}
-
-	// ------------------------------------------------------------------------
-
-	@Override
-	public DualInputOperator<?, ?, ?, ?> getOperator() {
-		return (DualInputOperator<?, ?, ?, ?>) super.getOperator();
-	}
-
-	/**
-	 * Gets the <tt>PactConnection</tt> through which this node receives its <i>first</i> input.
-	 * 
-	 * @return The first input connection.
-	 */
-	public DagConnection getFirstIncomingConnection() {
-		return this.input1;
-	}
-
-	/**
-	 * Gets the <tt>PactConnection</tt> through which this node receives its <i>second</i> input.
-	 * 
-	 * @return The second input connection.
-	 */
-	public DagConnection getSecondIncomingConnection() {
-		return this.input2;
-	}
-	
-	public OptimizerNode getFirstPredecessorNode() {
-		if(this.input1 != null) {
-			return this.input1.getSource();
-		} else {
-			return null;
-		}
-	}
-
-	public OptimizerNode getSecondPredecessorNode() {
-		if(this.input2 != null) {
-			return this.input2.getSource();
-		} else {
-			return null;
-		}
-	}
-
-	@Override
-	public List<DagConnection> getIncomingConnections() {
-		ArrayList<DagConnection> inputs = new ArrayList<DagConnection>(2);
-		inputs.add(input1);
-		inputs.add(input2);
-		return inputs;
-	}
-
-
-	@Override
-	public void setInput(Map<Operator<?>, OptimizerNode> contractToNode, ExecutionMode defaultExecutionMode) {
-		// see if there is a hint that dictates which shipping strategy to use for BOTH inputs
-		final Configuration conf = getOperator().getParameters();
-		ShipStrategyType preSet1 = null;
-		ShipStrategyType preSet2 = null;
-		
-		String shipStrategy = conf.getString(Optimizer.HINT_SHIP_STRATEGY, null);
-		if (shipStrategy != null) {
-			if (Optimizer.HINT_SHIP_STRATEGY_FORWARD.equals(shipStrategy)) {
-				preSet1 = preSet2 = ShipStrategyType.FORWARD;
-			} else if (Optimizer.HINT_SHIP_STRATEGY_BROADCAST.equals(shipStrategy)) {
-				preSet1 = preSet2 = ShipStrategyType.BROADCAST;
-			} else if (Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH.equals(shipStrategy)) {
-				preSet1 = preSet2 = ShipStrategyType.PARTITION_HASH;
-			} else if (Optimizer.HINT_SHIP_STRATEGY_REPARTITION_RANGE.equals(shipStrategy)) {
-				preSet1 = preSet2 = ShipStrategyType.PARTITION_RANGE;
-			} else if (shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_REPARTITION)) {
-				preSet1 = preSet2 = ShipStrategyType.PARTITION_RANDOM;
-			} else {
-				throw new CompilerException("Unknown hint for shipping strategy: " + shipStrategy);
-			}
-		}
-
-		// see if there is a hint that dictates which shipping strategy to use for the FIRST input
-		shipStrategy = conf.getString(Optimizer.HINT_SHIP_STRATEGY_FIRST_INPUT, null);
-		if (shipStrategy != null) {
-			if (Optimizer.HINT_SHIP_STRATEGY_FORWARD.equals(shipStrategy)) {
-				preSet1 = ShipStrategyType.FORWARD;
-			} else if (Optimizer.HINT_SHIP_STRATEGY_BROADCAST.equals(shipStrategy)) {
-				preSet1 = ShipStrategyType.BROADCAST;
-			} else if (Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH.equals(shipStrategy)) {
-				preSet1 = ShipStrategyType.PARTITION_HASH;
-			} else if (Optimizer.HINT_SHIP_STRATEGY_REPARTITION_RANGE.equals(shipStrategy)) {
-				preSet1 = ShipStrategyType.PARTITION_RANGE;
-			} else if (shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_REPARTITION)) {
-				preSet1 = ShipStrategyType.PARTITION_RANDOM;
-			} else {
-				throw new CompilerException("Unknown hint for shipping strategy of input one: " + shipStrategy);
-			}
-		}
-
-		// see if there is a hint that dictates which shipping strategy to use for the SECOND input
-		shipStrategy = conf.getString(Optimizer.HINT_SHIP_STRATEGY_SECOND_INPUT, null);
-		if (shipStrategy != null) {
-			if (Optimizer.HINT_SHIP_STRATEGY_FORWARD.equals(shipStrategy)) {
-				preSet2 = ShipStrategyType.FORWARD;
-			} else if (Optimizer.HINT_SHIP_STRATEGY_BROADCAST.equals(shipStrategy)) {
-				preSet2 = ShipStrategyType.BROADCAST;
-			} else if (Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH.equals(shipStrategy)) {
-				preSet2 = ShipStrategyType.PARTITION_HASH;
-			} else if (Optimizer.HINT_SHIP_STRATEGY_REPARTITION_RANGE.equals(shipStrategy)) {
-				preSet2 = ShipStrategyType.PARTITION_RANGE;
-			} else if (shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_REPARTITION)) {
-				preSet2 = ShipStrategyType.PARTITION_RANDOM;
-			} else {
-				throw new CompilerException("Unknown hint for shipping strategy of input two: " + shipStrategy);
-			}
-		}
-		
-		// get the predecessors
-		DualInputOperator<?, ?, ?, ?> contr = getOperator();
-		
-		Operator<?> leftPred = contr.getFirstInput();
-		Operator<?> rightPred = contr.getSecondInput();
-		
-		OptimizerNode pred1;
-		DagConnection conn1;
-		if (leftPred == null) {
-			throw new CompilerException("Error: Node for '" + getOperator().getName() + "' has no input set for first input.");
-		} else {
-			pred1 = contractToNode.get(leftPred);
-			conn1 = new DagConnection(pred1, this, defaultExecutionMode);
-			if (preSet1 != null) {
-				conn1.setShipStrategy(preSet1);
-			}
-		} 
-		
-		// create the connection and add it
-		this.input1 = conn1;
-		pred1.addOutgoingConnection(conn1);
-		
-		OptimizerNode pred2;
-		DagConnection conn2;
-		if (rightPred == null) {
-			throw new CompilerException("Error: Node for '" + getOperator().getName() + "' has no input set for second input.");
-		} else {
-			pred2 = contractToNode.get(rightPred);
-			conn2 = new DagConnection(pred2, this, defaultExecutionMode);
-			if (preSet2 != null) {
-				conn2.setShipStrategy(preSet2);
-			}
-		}
-		
-		// create the connection and add it
-		this.input2 = conn2;
-		pred2.addOutgoingConnection(conn2);
-	}
-	
-	protected abstract List<OperatorDescriptorDual> getPossibleProperties();
-
-	private List<OperatorDescriptorDual> getProperties() {
-		if (this.cachedDescriptors == null) {
-			this.cachedDescriptors = getPossibleProperties();
-		}
-		return this.cachedDescriptors;
-	}
-	
-	@Override
-	public void computeInterestingPropertiesForInputs(CostEstimator estimator) {
-		// get what we inherit and what is preserved by our user code 
-		final InterestingProperties props1 = getInterestingProperties().filterByCodeAnnotations(this, 0);
-		final InterestingProperties props2 = getInterestingProperties().filterByCodeAnnotations(this, 1);
-		
-		// add all properties relevant to this node
-		for (OperatorDescriptorDual dpd : getProperties()) {
-			for (GlobalPropertiesPair gp : dpd.getPossibleGlobalProperties()) {
-				// input 1
-				props1.addGlobalProperties(gp.getProperties1());
-				
-				// input 2
-				props2.addGlobalProperties(gp.getProperties2());
-			}
-			for (LocalPropertiesPair lp : dpd.getPossibleLocalProperties()) {
-				// input 1
-				props1.addLocalProperties(lp.getProperties1());
-				
-				// input 2
-				props2.addLocalProperties(lp.getProperties2());
-			}
-		}
-		this.input1.setInterestingProperties(props1);
-		this.input2.setInterestingProperties(props2);
-		
-		for (DagConnection conn : getBroadcastConnections()) {
-			conn.setInterestingProperties(new InterestingProperties());
-		}
-	}
-
-	@Override
-	public List<PlanNode> getAlternativePlans(CostEstimator estimator) {
-		// check if we have a cached version
-		if (this.cachedPlans != null) {
-			return this.cachedPlans;
-		}
-
-		boolean childrenSkippedDueToReplicatedInput = false;
-
-		// step down to all producer nodes and calculate alternative plans
-		final List<? extends PlanNode> subPlans1 = getFirstPredecessorNode().getAlternativePlans(estimator);
-		final List<? extends PlanNode> subPlans2 = getSecondPredecessorNode().getAlternativePlans(estimator);
-
-		// calculate alternative sub-plans for predecessor
-		final Set<RequestedGlobalProperties> intGlobal1 = this.input1.getInterestingProperties().getGlobalProperties();
-		final Set<RequestedGlobalProperties> intGlobal2 = this.input2.getInterestingProperties().getGlobalProperties();
-		
-		// calculate alternative sub-plans for broadcast inputs
-		final List<Set<? extends NamedChannel>> broadcastPlanChannels = new ArrayList<Set<? extends NamedChannel>>();
-		List<DagConnection> broadcastConnections = getBroadcastConnections();
-		List<String> broadcastConnectionNames = getBroadcastConnectionNames();
-
-		for (int i = 0; i < broadcastConnections.size(); i++ ) {
-			DagConnection broadcastConnection = broadcastConnections.get(i);
-			String broadcastConnectionName = broadcastConnectionNames.get(i);
-			List<PlanNode> broadcastPlanCandidates = broadcastConnection.getSource().getAlternativePlans(estimator);
-
-			// wrap the plan candidates in named channels
-			HashSet<NamedChannel> broadcastChannels = new HashSet<NamedChannel>(broadcastPlanCandidates.size());
-			for (PlanNode plan: broadcastPlanCandidates) {
-				final NamedChannel c = new NamedChannel(broadcastConnectionName, plan);
-				DataExchangeMode exMode = DataExchangeMode.select(broadcastConnection.getDataExchangeMode(),
-											ShipStrategyType.BROADCAST, broadcastConnection.isBreakingPipeline());
-				c.setShipStrategy(ShipStrategyType.BROADCAST, exMode);
-				broadcastChannels.add(c);
-			}
-			broadcastPlanChannels.add(broadcastChannels);
-		}
-		
-		final GlobalPropertiesPair[] allGlobalPairs;
-		final LocalPropertiesPair[] allLocalPairs;
-		{
-			Set<GlobalPropertiesPair> pairsGlob = new HashSet<GlobalPropertiesPair>();
-			Set<LocalPropertiesPair> pairsLoc = new HashSet<LocalPropertiesPair>();
-			for (OperatorDescriptorDual ods : getProperties()) {
-				pairsGlob.addAll(ods.getPossibleGlobalProperties());
-				pairsLoc.addAll(ods.getPossibleLocalProperties());
-			}
-			allGlobalPairs = pairsGlob.toArray(new GlobalPropertiesPair[pairsGlob.size()]);
-			allLocalPairs = pairsLoc.toArray(new LocalPropertiesPair[pairsLoc.size()]);
-		}
-		
-		final ArrayList<PlanNode> outputPlans = new ArrayList<PlanNode>();
-
-		final ExecutionMode input1Mode = this.input1.getDataExchangeMode();
-		final ExecutionMode input2Mode = this.input2.getDataExchangeMode();
-
-		final int dop = getParallelism();
-		final int inDop1 = getFirstPredecessorNode().getParallelism();
-		final int inDop2 = getSecondPredecessorNode().getParallelism();
-
-		final boolean dopChange1 = dop != inDop1;
-		final boolean dopChange2 = dop != inDop2;
-
-		final boolean input1breaksPipeline = this.input1.isBreakingPipeline();
-		final boolean input2breaksPipeline = this.input2.isBreakingPipeline();
-
-		// enumerate all pairwise combination of the children's plans together with
-		// all possible operator strategy combination
-		
-		// create all candidates
-		for (PlanNode child1 : subPlans1) {
-
-			if (child1.getGlobalProperties().isFullyReplicated()) {
-				// fully replicated input is always locally forwarded if DOP is not changed
-				if (dopChange1) {
-					// can not continue with this child
-					childrenSkippedDueToReplicatedInput = true;
-					continue;
-				} else {
-					this.input1.setShipStrategy(ShipStrategyType.FORWARD);
-				}
-			}
-
-			for (PlanNode child2 : subPlans2) {
-
-				if (child2.getGlobalProperties().isFullyReplicated()) {
-					// fully replicated input is always locally forwarded if DOP is not changed
-					if (dopChange2) {
-						// can not continue with this child
-						childrenSkippedDueToReplicatedInput = true;
-						continue;
-					} else {
-						this.input2.setShipStrategy(ShipStrategyType.FORWARD);
-					}
-				}
-				
-				// check that the children go together. that is the case if they build upon the same
-				// candidate at the joined branch plan. 
-				if (!areBranchCompatible(child1, child2)) {
-					continue;
-				}
-				
-				for (RequestedGlobalProperties igps1: intGlobal1) {
-					// create a candidate channel for the first input. mark it cached, if the connection says so
-					final Channel c1 = new Channel(child1, this.input1.getMaterializationMode());
-					if (this.input1.getShipStrategy() == null) {
-						// free to choose the ship strategy
-						igps1.parameterizeChannel(c1, dopChange1, input1Mode, input1breaksPipeline);
-						
-						// if the DOP changed, make sure that we cancel out properties, unless the
-						// ship strategy preserves/establishes them even under changing DOPs
-						if (dopChange1 && !c1.getShipStrategy().isNetworkStrategy()) {
-							c1.getGlobalProperties().reset();
-						}
-					}
-					else {
-						// ship strategy fixed by compiler hint
-						ShipStrategyType shipType = this.input1.getShipStrategy();
-						DataExchangeMode exMode = DataExchangeMode.select(input1Mode, shipType, input1breaksPipeline);
-						if (this.keys1 != null) {
-							c1.setShipStrategy(shipType, this.keys1.toFieldList(), exMode);
-						}
-						else {
-							c1.setShipStrategy(shipType, exMode);
-						}
-						
-						if (dopChange1) {
-							c1.adjustGlobalPropertiesForFullParallelismChange();
-						}
-					}
-					
-					for (RequestedGlobalProperties igps2: intGlobal2) {
-						// create a candidate channel for the first input. mark it cached, if the connection says so
-						final Channel c2 = new Channel(child2, this.input2.getMaterializationMode());
-						if (this.input2.getShipStrategy() == null) {
-							// free to choose the ship strategy
-							igps2.parameterizeChannel(c2, dopChange2, input2Mode, input2breaksPipeline);
-							
-							// if the DOP changed, make sure that we cancel out properties, unless the
-							// ship strategy preserves/establishes them even under changing DOPs
-							if (dopChange2 && !c2.getShipStrategy().isNetworkStrategy()) {
-								c2.getGlobalProperties().reset();
-							}
-						} else {
-							// ship strategy fixed by compiler hint
-							ShipStrategyType shipType = this.input2.getShipStrategy();
-							DataExchangeMode exMode = DataExchangeMode.select(input2Mode, shipType, input2breaksPipeline);
-							if (this.keys2 != null) {
-								c2.setShipStrategy(shipType, this.keys2.toFieldList(), exMode);
-							} else {
-								c2.setShipStrategy(shipType, exMode);
-							}
-							
-							if (dopChange2) {
-								c2.adjustGlobalPropertiesForFullParallelismChange();
-							}
-						}
-						
-						/* ********************************************************************
-						 * NOTE: Depending on how we proceed with different partitioning,
-						 *       we might at some point need a compatibility check between
-						 *       the pairs of global properties.
-						 * *******************************************************************/
-						
-						outer:
-						for (GlobalPropertiesPair gpp : allGlobalPairs) {
-							if (gpp.getProperties1().isMetBy(c1.getGlobalProperties()) && 
-								gpp.getProperties2().isMetBy(c2.getGlobalProperties()) )
-							{
-								for (OperatorDescriptorDual desc : getProperties()) {
-									if (desc.areCompatible(gpp.getProperties1(), gpp.getProperties2(), 
-											c1.getGlobalProperties(), c2.getGlobalProperties()))
-									{
-										Channel c1Clone = c1.clone();
-										c1Clone.setRequiredGlobalProps(gpp.getProperties1());
-										c2.setRequiredGlobalProps(gpp.getProperties2());
-										
-										// we form a valid combination, so create the local candidates
-										// for this
-										addLocalCandidates(c1Clone, c2, broadcastPlanChannels, igps1, igps2,
-																			outputPlans, allLocalPairs, estimator);
-										break outer;
-									}
-								}
-							}
-						}
-						
-						// break the loop over input2's possible global properties, if the property
-						// is fixed via a hint. All the properties are overridden by the hint anyways,
-						// so we can stop after the first
-						if (this.input2.getShipStrategy() != null) {
-							break;
-						}
-					}
-					
-					// break the loop over input1's possible global properties, if the property
-					// is fixed via a hint. All the properties are overridden by the hint anyways,
-					// so we can stop after the first
-					if (this.input1.getShipStrategy() != null) {
-						break;
-					}
-				}
-			}
-		}
-
-		if(outputPlans.isEmpty()) {
-			if(childrenSkippedDueToReplicatedInput) {
-				throw new CompilerException("No plan meeting the requirements could be created @ " + this
-											+ ". Most likely reason: Invalid use of replicated input.");
-			} else {
-				throw new CompilerException("No plan meeting the requirements could be created @ " + this
-											+ ". Most likely reason: Too restrictive plan hints.");
-			}
-		}
-
-		// cost and prune the plans
-		for (PlanNode node : outputPlans) {
-			estimator.costOperator(node);
-		}
-		prunePlanAlternatives(outputPlans);
-		outputPlans.trimToSize();
-
-		this.cachedPlans = outputPlans;
-		return outputPlans;
-	}
-	
-	protected void addLocalCandidates(Channel template1, Channel template2, List<Set<? extends NamedChannel>> broadcastPlanChannels, 
-			RequestedGlobalProperties rgps1, RequestedGlobalProperties rgps2,
-			List<PlanNode> target, LocalPropertiesPair[] validLocalCombinations, CostEstimator estimator)
-	{
-		for (RequestedLocalProperties ilp1 : this.input1.getInterestingProperties().getLocalProperties()) {
-			final Channel in1 = template1.clone();
-			ilp1.parameterizeChannel(in1);
-			
-			for (RequestedLocalProperties ilp2 : this.input2.getInterestingProperties().getLocalProperties()) {
-				final Channel in2 = template2.clone();
-				ilp2.parameterizeChannel(in2);
-				
-				for (OperatorDescriptorDual dps: getProperties()) {
-					for (LocalPropertiesPair lpp : dps.getPossibleLocalProperties()) {
-						if (lpp.getProperties1().isMetBy(in1.getLocalProperties()) &&
-							lpp.getProperties2().isMetBy(in2.getLocalProperties()) )
-						{
-							// valid combination
-							// for non trivial local properties, we need to check that they are co compatible
-							// (such as when some sort order is requested, that both are the same sort order
-							if (dps.areCoFulfilled(lpp.getProperties1(), lpp.getProperties2(), 
-								in1.getLocalProperties(), in2.getLocalProperties()))
-							{
-								// copy, because setting required properties and instantiation may
-								// change the channels and should not affect prior candidates
-								Channel in1Copy = in1.clone();
-								in1Copy.setRequiredLocalProps(lpp.getProperties1());
-								
-								Channel in2Copy = in2.clone();
-								in2Copy.setRequiredLocalProps(lpp.getProperties2());
-								
-								// all right, co compatible
-								instantiate(dps, in1Copy, in2Copy, broadcastPlanChannels, target, estimator, rgps1, rgps2, ilp1, ilp2);
-								break;
-							}
-							// else cannot use this pair, fall through the loop and try the next one
-						}
-					}
-				}
-			}
-		}
-	}
-	
-	protected void instantiate(OperatorDescriptorDual operator, Channel in1, Channel in2,
-			List<Set<? extends NamedChannel>> broadcastPlanChannels, List<PlanNode> target, CostEstimator estimator,
-			RequestedGlobalProperties globPropsReq1, RequestedGlobalProperties globPropsReq2,
-			RequestedLocalProperties locPropsReq1, RequestedLocalProperties locPropsReq2)
-	{
-		final PlanNode inputSource1 = in1.getSource();
-		final PlanNode inputSource2 = in2.getSource();
-		
-		for (List<NamedChannel> broadcastChannelsCombination: Sets.cartesianProduct(broadcastPlanChannels)) {
-			
-			boolean validCombination = true;
-			
-			// check whether the broadcast inputs use the same plan candidate at the branching point
-			for (int i = 0; i < broadcastChannelsCombination.size(); i++) {
-				NamedChannel nc = broadcastChannelsCombination.get(i);
-				PlanNode bcSource = nc.getSource();
-				
-				if (!(areBranchCompatible(bcSource, inputSource1) || areBranchCompatible(bcSource, inputSource2))) {
-					validCombination = false;
-					break;
-				}
-				
-				// check branch compatibility against all other broadcast variables
-				for (int k = 0; k < i; k++) {
-					PlanNode otherBcSource = broadcastChannelsCombination.get(k).getSource();
-					
-					if (!areBranchCompatible(bcSource, otherBcSource)) {
-						validCombination = false;
-						break;
-					}
-				}
-			}
-			
-			if (!validCombination) {
-				continue;
-			}
-			
-			placePipelineBreakersIfNecessary(operator.getStrategy(), in1, in2);
-			
-			DualInputPlanNode node = operator.instantiate(in1, in2, this);
-			node.setBroadcastInputs(broadcastChannelsCombination);
-
-			SemanticProperties props = this.getSemanticProperties();
-			GlobalProperties gp1 = in1.getGlobalProperties().clone().filterBySemanticProperties(props, 0);
-			GlobalProperties gp2 = in2.getGlobalProperties().clone().filterBySemanticProperties(props, 1);
-			GlobalProperties combined = operator.computeGlobalProperties(gp1, gp2);
-
-			LocalProperties lp1 = in1.getLocalProperties().clone().filterBySemanticProperties(props, 0);
-			LocalProperties lp2 = in2.getLocalProperties().clone().filterBySemanticProperties(props, 1);
-			LocalProperties locals = operator.computeLocalProperties(lp1, lp2);
-			
-			node.initProperties(combined, locals);
-			node.updatePropertiesWithUniqueSets(getUniqueFields());
-			target.add(node);
-		}
-	}
-	
-	protected void placePipelineBreakersIfNecessary(DriverStrategy strategy, Channel in1, Channel in2) {
-		// before we instantiate, check for deadlocks by tracing back to the open branches and checking
-		// whether either no input, or all of them have a dam
-		if (this.hereJoinedBranches != null && this.hereJoinedBranches.size() > 0) {
-			boolean someDamOnLeftPaths = false;
-			boolean damOnAllLeftPaths = true;
-			boolean someDamOnRightPaths = false;
-			boolean damOnAllRightPaths = true;
-			
-			if (strategy.firstDam() == DamBehavior.FULL_DAM || in1.getLocalStrategy().dams() || in1.getTempMode().breaksPipeline()) {
-				someDamOnLeftPaths = true;
-			} else {
-				for (OptimizerNode brancher : this.hereJoinedBranches) {
-					PlanNode candAtBrancher = in1.getSource().getCandidateAtBranchPoint(brancher);
-					
-					// not all candidates are found, because this list includes joined branched from both regular inputs and broadcast vars
-					if (candAtBrancher == null) {
-						continue;
-					}
-					
-					SourceAndDamReport res = in1.getSource().hasDamOnPathDownTo(candAtBrancher);
-					if (res == NOT_FOUND) {
-						throw new CompilerException("Bug: Tracing dams for deadlock detection is broken.");
-					} else if (res == FOUND_SOURCE) {
-						damOnAllLeftPaths = false;
-					} else if (res == FOUND_SOURCE_AND_DAM) {
-						someDamOnLeftPaths = true;
-					} else {
-						throw new CompilerException();
-					}
-				}
-			}
-			
-			if (strategy.secondDam() == DamBehavior.FULL_DAM || in2.getLocalStrategy().dams() || in2.getTempMode().breaksPipeline()) {
-				someDamOnRightPaths = true;
-			} else {
-				for (OptimizerNode brancher : this.hereJoinedBranches) {
-					PlanNode candAtBrancher = in2.getSource().getCandidateAtBranchPoint(brancher);
-					
-					// not all candidates are found, because this list includes joined branched from both regular inputs and broadcast vars
-					if (candAtBrancher == null) {
-						continue;
-					}
-					
-					SourceAndDamReport res = in2.getSource().hasDamOnPathDownTo(candAtBrancher);
-					if (res == NOT_FOUND) {
-						throw new CompilerException("Bug: Tracing dams for deadlock detection is broken.");
-					} else if (res == FOUND_SOURCE) {
-						damOnAllRightPaths = false;
-					} else if (res == FOUND_SOURCE_AND_DAM) {
-						someDamOnRightPaths = true;
-					} else {
-						throw new CompilerException();
-					}
-				}
-			}
-			
-			// okay combinations are both all dam or both no dam
-			if ( (damOnAllLeftPaths & damOnAllRightPaths) | (!someDamOnLeftPaths & !someDamOnRightPaths) ) {
-				// good, either both materialize already on the way, or both fully pipeline
-			} else {
-				if (someDamOnLeftPaths & !damOnAllRightPaths) {
-					// right needs a pipeline breaker
-					in2.setTempMode(in2.getTempMode().makePipelineBreaker());
-				}
-				
-				if (someDamOnRightPaths & !damOnAllLeftPaths) {
-					// right needs a pipeline breaker
-					in1.setTempMode(in1.getTempMode().makePipelineBreaker());
-				}
-			}
-		}
-	}
-
-	@Override
-	public void computeUnclosedBranchStack() {
-		if (this.openBranches != null) {
-			return;
-		}
-
-		// handle the data flow branching for the regular inputs
-		addClosedBranches(getFirstPredecessorNode().closedBranchingNodes);
-		addClosedBranches(getSecondPredecessorNode().closedBranchingNodes);
-		
-		List<UnclosedBranchDescriptor> result1 = getFirstPredecessorNode().getBranchesForParent(getFirstIncomingConnection());
-		List<UnclosedBranchDescriptor> result2 = getSecondPredecessorNode().getBranchesForParent(getSecondIncomingConnection());
-
-		ArrayList<UnclosedBranchDescriptor> inputsMerged = new ArrayList<UnclosedBranchDescriptor>();
-		mergeLists(result1, result2, inputsMerged, true);
-		
-		// handle the data flow branching for the broadcast inputs
-		List<UnclosedBranchDescriptor> result = computeUnclosedBranchStackForBroadcastInputs(inputsMerged);
-		
-		this.openBranches = (result == null || result.isEmpty()) ? Collections.<UnclosedBranchDescriptor>emptyList() : result;
-	}
-
-	@Override
-	public SemanticProperties getSemanticProperties() {
-		return getOperator().getSemanticProperties();
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//                                     Miscellaneous
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public void accept(Visitor<OptimizerNode> visitor) {
-		if (visitor.preVisit(this)) {
-			if (this.input1 == null || this.input2 == null) {
-				throw new CompilerException();
-			}
-			
-			getFirstPredecessorNode().accept(visitor);
-			getSecondPredecessorNode().accept(visitor);
-			
-			for (DagConnection connection : getBroadcastConnections()) {
-				connection.getSource().accept(visitor);
-			}
-			
-			visitor.postVisit(this);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/UnaryOperatorNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/UnaryOperatorNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/UnaryOperatorNode.java
deleted file mode 100644
index 45ecdac..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/UnaryOperatorNode.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dag;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.api.common.operators.SemanticProperties;
-import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
-import org.apache.flink.api.common.operators.util.FieldSet;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.operators.OperatorDescriptorSingle;
-
-
-public class UnaryOperatorNode extends SingleInputNode {
-	
-	private final List<OperatorDescriptorSingle> operator;
-	
-	private final String name;
-
-
-	
-	public UnaryOperatorNode(String name, FieldSet keys, OperatorDescriptorSingle ... operators) {
-		this(name, keys, Arrays.asList(operators));
-	}
-	
-	public UnaryOperatorNode(String name, FieldSet keys, List<OperatorDescriptorSingle> operators) {
-		super(keys);
-		
-		this.operator = operators;
-		this.name = name;
-	}
-
-	@Override
-	protected List<OperatorDescriptorSingle> getPossibleProperties() {
-		return this.operator;
-	}
-
-	@Override
-	public String getName() {
-		return this.name;
-	}
-
-	@Override
-	public SemanticProperties getSemanticProperties() {
-		return new SingleInputSemanticProperties.AllFieldsForwardedProperties();
-	}
-
-	@Override
-	protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
-		// we have no estimates by default
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java
deleted file mode 100644
index e85f289..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java
+++ /dev/null
@@ -1,589 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dag;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.flink.api.common.ExecutionMode;
-import org.apache.flink.api.common.operators.SemanticProperties;
-import org.apache.flink.api.common.operators.SemanticProperties.EmptySemanticProperties;
-import org.apache.flink.api.common.operators.base.DeltaIterationBase;
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.common.typeinfo.NothingTypeInfo;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.traversals.InterestingPropertyVisitor;
-import org.apache.flink.optimizer.costs.CostEstimator;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.InterestingProperties;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.optimizer.dataproperties.PartitioningProperty;
-import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
-import org.apache.flink.optimizer.operators.OperatorDescriptorDual;
-import org.apache.flink.optimizer.operators.SolutionSetDeltaOperator;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.DualInputPlanNode;
-import org.apache.flink.optimizer.plan.NamedChannel;
-import org.apache.flink.optimizer.plan.PlanNode;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.optimizer.plan.SolutionSetPlanNode;
-import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
-import org.apache.flink.optimizer.plan.WorksetPlanNode;
-import org.apache.flink.optimizer.plan.PlanNode.FeedbackPropertiesMeetRequirementsReport;
-import org.apache.flink.optimizer.util.NoOpBinaryUdfOp;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.apache.flink.types.Nothing;
-import org.apache.flink.util.Visitor;
-
-/**
- * A node in the optimizer's program representation for a workset iteration.
- */
-public class WorksetIterationNode extends TwoInputNode implements IterationNode {
-	
-	private static final int DEFAULT_COST_WEIGHT = 20;
-	
-	
-	private final FieldList solutionSetKeyFields;
-	
-	private final GlobalProperties partitionedProperties;
-	
-	private final List<OperatorDescriptorDual> dataProperties;
-	
-	private SolutionSetNode solutionSetNode;
-	
-	private WorksetNode worksetNode;
-	
-	private OptimizerNode solutionSetDelta;
-	
-	private OptimizerNode nextWorkset;
-	
-	private DagConnection solutionSetDeltaRootConnection;
-	
-	private DagConnection nextWorksetRootConnection;
-	
-	private SingleRootJoiner singleRoot;
-	
-	private boolean solutionDeltaImmediatelyAfterSolutionJoin;
-	
-	private final int costWeight;
-
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * Creates a new node with a single input for the optimizer plan.
-	 * 
-	 * @param iteration The iteration operator that the node represents.
-	 */
-	public WorksetIterationNode(DeltaIterationBase<?, ?> iteration) {
-		super(iteration);
-		
-		final int[] ssKeys = iteration.getSolutionSetKeyFields();
-		if (ssKeys == null || ssKeys.length == 0) {
-			throw new CompilerException("Invalid WorksetIteration: No key fields defined for the solution set.");
-		}
-		this.solutionSetKeyFields = new FieldList(ssKeys);
-		this.partitionedProperties = new GlobalProperties();
-		this.partitionedProperties.setHashPartitioned(this.solutionSetKeyFields);
-		
-		int weight = iteration.getMaximumNumberOfIterations() > 0 ? 
-			iteration.getMaximumNumberOfIterations() : DEFAULT_COST_WEIGHT;
-			
-		if (weight > OptimizerNode.MAX_DYNAMIC_PATH_COST_WEIGHT) {
-			weight = OptimizerNode.MAX_DYNAMIC_PATH_COST_WEIGHT;
-		}
-		this.costWeight = weight; 
-		
-		this.dataProperties = Collections.<OperatorDescriptorDual>singletonList(new WorksetOpDescriptor(this.solutionSetKeyFields));
-	}
-
-	// --------------------------------------------------------------------------------------------
-	
-	public DeltaIterationBase<?, ?> getIterationContract() {
-		return (DeltaIterationBase<?, ?>) getOperator();
-	}
-	
-	public SolutionSetNode getSolutionSetNode() {
-		return this.solutionSetNode;
-	}
-	
-	public WorksetNode getWorksetNode() {
-		return this.worksetNode;
-	}
-	
-	public OptimizerNode getNextWorkset() {
-		return this.nextWorkset;
-	}
-	
-	public OptimizerNode getSolutionSetDelta() {
-		return this.solutionSetDelta;
-	}
-
-	public void setPartialSolution(SolutionSetNode solutionSetNode, WorksetNode worksetNode) {
-		if (this.solutionSetNode != null || this.worksetNode != null) {
-			throw new IllegalStateException("Error: Initializing WorksetIterationNode multiple times.");
-		}
-		this.solutionSetNode = solutionSetNode;
-		this.worksetNode = worksetNode;
-	}
-	
-	public void setNextPartialSolution(OptimizerNode solutionSetDelta, OptimizerNode nextWorkset,
-										ExecutionMode executionMode) {
-
-		// check whether the next partial solution is itself the join with
-		// the partial solution (so we can potentially do direct updates)
-		if (solutionSetDelta instanceof TwoInputNode) {
-			TwoInputNode solutionDeltaTwoInput = (TwoInputNode) solutionSetDelta;
-			if (solutionDeltaTwoInput.getFirstPredecessorNode() == this.solutionSetNode ||
-				solutionDeltaTwoInput.getSecondPredecessorNode() == this.solutionSetNode)
-			{
-				this.solutionDeltaImmediatelyAfterSolutionJoin = true;
-			}
-		}
-		
-		// there needs to be at least one node in the workset path, so
-		// if the next workset is equal to the workset, we need to inject a no-op node
-		if (nextWorkset == worksetNode || nextWorkset instanceof BinaryUnionNode) {
-			NoOpNode noop = new NoOpNode();
-			noop.setDegreeOfParallelism(getParallelism());
-
-			DagConnection noOpConn = new DagConnection(nextWorkset, noop, executionMode);
-			noop.setIncomingConnection(noOpConn);
-			nextWorkset.addOutgoingConnection(noOpConn);
-			
-			nextWorkset = noop;
-		}
-		
-		// attach an extra node to the solution set delta for the cases where we need to repartition
-		UnaryOperatorNode solutionSetDeltaUpdateAux = new UnaryOperatorNode("Solution-Set Delta", getSolutionSetKeyFields(),
-				new SolutionSetDeltaOperator(getSolutionSetKeyFields()));
-		solutionSetDeltaUpdateAux.setDegreeOfParallelism(getParallelism());
-
-		DagConnection conn = new DagConnection(solutionSetDelta, solutionSetDeltaUpdateAux, executionMode);
-		solutionSetDeltaUpdateAux.setIncomingConnection(conn);
-		solutionSetDelta.addOutgoingConnection(conn);
-		
-		this.solutionSetDelta = solutionSetDeltaUpdateAux;
-		this.nextWorkset = nextWorkset;
-		
-		this.singleRoot = new SingleRootJoiner();
-		this.solutionSetDeltaRootConnection = new DagConnection(solutionSetDeltaUpdateAux,
-													this.singleRoot, executionMode);
-
-		this.nextWorksetRootConnection = new DagConnection(nextWorkset, this.singleRoot, executionMode);
-		this.singleRoot.setInputs(this.solutionSetDeltaRootConnection, this.nextWorksetRootConnection);
-		
-		solutionSetDeltaUpdateAux.addOutgoingConnection(this.solutionSetDeltaRootConnection);
-		nextWorkset.addOutgoingConnection(this.nextWorksetRootConnection);
-	}
-	
-	public int getCostWeight() {
-		return this.costWeight;
-	}
-	
-	public TwoInputNode getSingleRootOfStepFunction() {
-		return this.singleRoot;
-	}
-	
-	public FieldList getSolutionSetKeyFields() {
-		return this.solutionSetKeyFields;
-	}
-	
-	public OptimizerNode getInitialSolutionSetPredecessorNode() {
-		return getFirstPredecessorNode();
-	}
-	
-	public OptimizerNode getInitialWorksetPredecessorNode() {
-		return getSecondPredecessorNode();
-	}
-
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public String getName() {
-		return "Workset Iteration";
-	}
-
-	@Override
-	public SemanticProperties getSemanticProperties() {
-		return new EmptySemanticProperties();
-	}
-
-	protected void readStubAnnotations() {}
-	
-	@Override
-	protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
-		this.estimatedOutputSize = getFirstPredecessorNode().getEstimatedOutputSize();
-		this.estimatedNumRecords = getFirstPredecessorNode().getEstimatedNumRecords();
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//                             Properties and Optimization
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	protected List<OperatorDescriptorDual> getPossibleProperties() {
-		return this.dataProperties;
-	}
-	
-	@Override
-	public void computeInterestingPropertiesForInputs(CostEstimator estimator) {
-		// our own solution (the solution set) is always partitioned and this cannot be adjusted
-		// depending on what the successor to the workset iteration requests. for that reason,
-		// we ignore incoming interesting properties.
-		
-		// in addition, we need to make 2 interesting property passes, because the root of the step function 
-		// that computes the next workset needs the interesting properties as generated by the
-		// workset source of the step function. the second pass concerns only the workset path.
-		// as initial interesting properties, we have the trivial ones for the step function,
-		// and partitioned on the solution set key for the solution set delta 
-		
-		RequestedGlobalProperties partitionedProperties = new RequestedGlobalProperties();
-		partitionedProperties.setHashPartitioned(this.solutionSetKeyFields);
-		InterestingProperties partitionedIP = new InterestingProperties();
-		partitionedIP.addGlobalProperties(partitionedProperties);
-		partitionedIP.addLocalProperties(new RequestedLocalProperties());
-		
-		this.nextWorksetRootConnection.setInterestingProperties(new InterestingProperties());
-		this.solutionSetDeltaRootConnection.setInterestingProperties(partitionedIP.clone());
-		
-		InterestingPropertyVisitor ipv = new InterestingPropertyVisitor(estimator);
-		this.nextWorkset.accept(ipv);
-		this.solutionSetDelta.accept(ipv);
-		
-		// take the interesting properties of the partial solution and add them to the root interesting properties
-		InterestingProperties worksetIntProps = this.worksetNode.getInterestingProperties();
-		InterestingProperties intProps = new InterestingProperties();
-		intProps.getGlobalProperties().addAll(worksetIntProps.getGlobalProperties());
-		intProps.getLocalProperties().addAll(worksetIntProps.getLocalProperties());
-		
-		// clear all interesting properties to prepare the second traversal
-		this.nextWorksetRootConnection.clearInterestingProperties();
-		this.nextWorkset.accept(InterestingPropertiesClearer.INSTANCE);
-		
-		// 2nd pass
-		this.nextWorksetRootConnection.setInterestingProperties(intProps);
-		this.nextWorkset.accept(ipv);
-		
-		// now add the interesting properties of the workset to the workset input
-		final InterestingProperties inProps = this.worksetNode.getInterestingProperties().clone();
-		inProps.addGlobalProperties(new RequestedGlobalProperties());
-		inProps.addLocalProperties(new RequestedLocalProperties());
-		this.input2.setInterestingProperties(inProps);
-		
-		// the partial solution must be hash partitioned, so it has only that as interesting properties
-		this.input1.setInterestingProperties(partitionedIP);
-	}
-	
-	@Override
-	public void clearInterestingProperties() {
-		super.clearInterestingProperties();
-		
-		this.nextWorksetRootConnection.clearInterestingProperties();
-		this.solutionSetDeltaRootConnection.clearInterestingProperties();
-		
-		this.nextWorkset.accept(InterestingPropertiesClearer.INSTANCE);
-		this.solutionSetDelta.accept(InterestingPropertiesClearer.INSTANCE);
-	}
-	
-	@Override
-	protected void instantiate(OperatorDescriptorDual operator, Channel solutionSetIn, Channel worksetIn,
-			List<Set<? extends NamedChannel>> broadcastPlanChannels, List<PlanNode> target, CostEstimator estimator,
-			RequestedGlobalProperties globPropsReqSolutionSet, RequestedGlobalProperties globPropsReqWorkset,
-			RequestedLocalProperties locPropsReqSolutionSet, RequestedLocalProperties locPropsReqWorkset)
-	{
-		// check for pipeline breaking using hash join with build on the solution set side
-		placePipelineBreakersIfNecessary(DriverStrategy.HYBRIDHASH_BUILD_FIRST, solutionSetIn, worksetIn);
-		
-		// NOTES ON THE ENUMERATION OF THE STEP FUNCTION PLANS:
-		// Whenever we instantiate the iteration, we enumerate new candidates for the step function.
-		// That way, we make sure we have an appropriate plan for each candidate for the initial partial solution,
-		// we have a fitting candidate for the step function (often, work is pushed out of the step function).
-		// Among the candidates of the step function, we keep only those that meet the requested properties of the
-		// current candidate initial partial solution. That makes sure these properties exist at the beginning of
-		// every iteration.
-		
-		// 1) Because we enumerate multiple times, we may need to clean the cached plans
-		//    before starting another enumeration
-		this.nextWorkset.accept(PlanCacheCleaner.INSTANCE);
-		this.solutionSetDelta.accept(PlanCacheCleaner.INSTANCE);
-		
-		// 2) Give the partial solution the properties of the current candidate for the initial partial solution
-		//    This concerns currently only the workset.
-		this.worksetNode.setCandidateProperties(worksetIn.getGlobalProperties(), worksetIn.getLocalProperties(), worksetIn);
-		this.solutionSetNode.setCandidateProperties(this.partitionedProperties, new LocalProperties(), solutionSetIn);
-		
-		final SolutionSetPlanNode sspn = this.solutionSetNode.getCurrentSolutionSetPlanNode();
-		final WorksetPlanNode wspn = this.worksetNode.getCurrentWorksetPlanNode();
-		
-		// 3) Get the alternative plans
-		List<PlanNode> solutionSetDeltaCandidates = this.solutionSetDelta.getAlternativePlans(estimator);
-		List<PlanNode> worksetCandidates = this.nextWorkset.getAlternativePlans(estimator);
-		
-		// 4) Throw away all that are not compatible with the properties currently requested to the
-		//    initial partial solution
-		
-		// Make sure that the workset candidates fulfill the input requirements
-		{
-			List<PlanNode> newCandidates = new ArrayList<PlanNode>();
-			
-			for (Iterator<PlanNode> planDeleter = worksetCandidates.iterator(); planDeleter.hasNext(); ) {
-				PlanNode candidate = planDeleter.next();
-				
-				GlobalProperties atEndGlobal = candidate.getGlobalProperties();
-				LocalProperties atEndLocal = candidate.getLocalProperties();
-				
-				FeedbackPropertiesMeetRequirementsReport report = candidate.checkPartialSolutionPropertiesMet(wspn,
-																							atEndGlobal, atEndLocal);
-
-				if (report == FeedbackPropertiesMeetRequirementsReport.NO_PARTIAL_SOLUTION) {
-					; // depends only through broadcast variable on the workset solution
-				}
-				else if (report == FeedbackPropertiesMeetRequirementsReport.NOT_MET) {
-					// attach a no-op node through which we create the properties of the original input
-					Channel toNoOp = new Channel(candidate);
-					globPropsReqWorkset.parameterizeChannel(toNoOp, false,
-															nextWorksetRootConnection.getDataExchangeMode(), false);
-					locPropsReqWorkset.parameterizeChannel(toNoOp);
-					
-					UnaryOperatorNode rebuildWorksetPropertiesNode = new UnaryOperatorNode("Rebuild Workset Properties",
-																							FieldList.EMPTY_LIST);
-					
-					rebuildWorksetPropertiesNode.setDegreeOfParallelism(candidate.getParallelism());
-					
-					SingleInputPlanNode rebuildWorksetPropertiesPlanNode = new SingleInputPlanNode(
-												rebuildWorksetPropertiesNode, "Rebuild Workset Properties",
-												toNoOp, DriverStrategy.UNARY_NO_OP);
-					rebuildWorksetPropertiesPlanNode.initProperties(toNoOp.getGlobalProperties(),
-																	toNoOp.getLocalProperties());
-					estimator.costOperator(rebuildWorksetPropertiesPlanNode);
-						
-					GlobalProperties atEndGlobalModified = rebuildWorksetPropertiesPlanNode.getGlobalProperties();
-					LocalProperties atEndLocalModified = rebuildWorksetPropertiesPlanNode.getLocalProperties();
-						
-					if (!(atEndGlobalModified.equals(atEndGlobal) && atEndLocalModified.equals(atEndLocal))) {
-						FeedbackPropertiesMeetRequirementsReport report2 = candidate.checkPartialSolutionPropertiesMet(
-																		wspn, atEndGlobalModified, atEndLocalModified);
-						if (report2 != FeedbackPropertiesMeetRequirementsReport.NOT_MET) {
-							newCandidates.add(rebuildWorksetPropertiesPlanNode);
-						}
-					}
-					
-					// remove the original operator and add the modified candidate
-					planDeleter.remove();
-					
-				}
-			}
-			
-			worksetCandidates.addAll(newCandidates);
-		}
-		
-		if (worksetCandidates.isEmpty()) {
-			return;
-		}
-		
-		// sanity check the solution set delta
-		for (PlanNode solutionSetDeltaCandidate : solutionSetDeltaCandidates) {
-			SingleInputPlanNode candidate = (SingleInputPlanNode) solutionSetDeltaCandidate;
-			GlobalProperties gp = candidate.getGlobalProperties();
-
-			if (gp.getPartitioning() != PartitioningProperty.HASH_PARTITIONED || gp.getPartitioningFields() == null ||
-					!gp.getPartitioningFields().equals(this.solutionSetKeyFields)) {
-				throw new CompilerException("Bug: The solution set delta is not partitioned.");
-			}
-		}
-		
-		// 5) Create a candidate for the Iteration Node for every remaining plan of the step function.
-		
-		final GlobalProperties gp = new GlobalProperties();
-		gp.setHashPartitioned(this.solutionSetKeyFields);
-		gp.addUniqueFieldCombination(this.solutionSetKeyFields);
-		
-		LocalProperties lp = LocalProperties.EMPTY.addUniqueFields(this.solutionSetKeyFields);
-		
-		// take all combinations of solution set delta and workset plans
-		for (PlanNode solutionSetCandidate : solutionSetDeltaCandidates) {
-			for (PlanNode worksetCandidate : worksetCandidates) {
-				// check whether they have the same operator at their latest branching point
-				if (this.singleRoot.areBranchCompatible(solutionSetCandidate, worksetCandidate)) {
-					
-					SingleInputPlanNode siSolutionDeltaCandidate = (SingleInputPlanNode) solutionSetCandidate;
-					boolean immediateDeltaUpdate;
-					
-					// check whether we need a dedicated solution set delta operator, or whether we can update on the fly
-					if (siSolutionDeltaCandidate.getInput().getShipStrategy() == ShipStrategyType.FORWARD &&
-							this.solutionDeltaImmediatelyAfterSolutionJoin)
-					{
-						// we do not need this extra node. we can make the predecessor the delta
-						// sanity check the node and connection
-						if (siSolutionDeltaCandidate.getDriverStrategy() != DriverStrategy.UNARY_NO_OP ||
-								siSolutionDeltaCandidate.getInput().getLocalStrategy() != LocalStrategy.NONE)
-						{
-							throw new CompilerException("Invalid Solution set delta node.");
-						}
-						
-						solutionSetCandidate = siSolutionDeltaCandidate.getInput().getSource();
-						immediateDeltaUpdate = true;
-					} else {
-						// was not partitioned, we need to keep this node.
-						// mark that we materialize the input
-						siSolutionDeltaCandidate.getInput().setTempMode(TempMode.PIPELINE_BREAKER);
-						immediateDeltaUpdate = false;
-					}
-					
-					WorksetIterationPlanNode wsNode = new WorksetIterationPlanNode(this,
-							"WorksetIteration ("+this.getOperator().getName()+")", solutionSetIn,
-							worksetIn, sspn, wspn, worksetCandidate, solutionSetCandidate);
-					wsNode.setImmediateSolutionSetUpdate(immediateDeltaUpdate);
-					wsNode.initProperties(gp, lp);
-					target.add(wsNode);
-				}
-			}
-		}
-	}
-
-	@Override
-	public void computeUnclosedBranchStack() {
-		if (this.openBranches != null) {
-			return;
-		}
-		
-		// IMPORTANT: First compute closed branches from the two inputs
-		// we need to do this because the runtime iteration head effectively joins
-		addClosedBranches(getFirstPredecessorNode().closedBranchingNodes);
-		addClosedBranches(getSecondPredecessorNode().closedBranchingNodes);
-
-		List<UnclosedBranchDescriptor> result1 = getFirstPredecessorNode().getBranchesForParent(getFirstIncomingConnection());
-		List<UnclosedBranchDescriptor> result2 = getSecondPredecessorNode().getBranchesForParent(getSecondIncomingConnection());
-
-		ArrayList<UnclosedBranchDescriptor> inputsMerged1 = new ArrayList<UnclosedBranchDescriptor>();
-		mergeLists(result1, result2, inputsMerged1, true); // this method also sets which branches are joined here (in the head)
-		
-		addClosedBranches(getSingleRootOfStepFunction().closedBranchingNodes);
-
-		ArrayList<UnclosedBranchDescriptor> inputsMerged2 = new ArrayList<UnclosedBranchDescriptor>();
-		List<UnclosedBranchDescriptor> result3 = getSingleRootOfStepFunction().openBranches;
-		mergeLists(inputsMerged1, result3, inputsMerged2, true);
-
-		// handle the data flow branching for the broadcast inputs
-		List<UnclosedBranchDescriptor> result = computeUnclosedBranchStackForBroadcastInputs(inputsMerged2);
-
-		this.openBranches = (result == null || result.isEmpty()) ? Collections.<UnclosedBranchDescriptor>emptyList() : result;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//                      Iteration Specific Traversals
-	// --------------------------------------------------------------------------------------------
-
-	public void acceptForStepFunction(Visitor<OptimizerNode> visitor) {
-		this.singleRoot.accept(visitor);
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//                             Utility Classes
-	// --------------------------------------------------------------------------------------------
-	
-	private static final class WorksetOpDescriptor extends OperatorDescriptorDual {
-		
-		private WorksetOpDescriptor(FieldList solutionSetKeys) {
-			super(solutionSetKeys, null);
-		}
-
-		@Override
-		public DriverStrategy getStrategy() {
-			return DriverStrategy.NONE;
-		}
-
-		@Override
-		protected List<GlobalPropertiesPair> createPossibleGlobalProperties() {
-			RequestedGlobalProperties partitionedGp = new RequestedGlobalProperties();
-			partitionedGp.setHashPartitioned(this.keys1);
-			return Collections.singletonList(new GlobalPropertiesPair(partitionedGp, new RequestedGlobalProperties()));
-		}
-
-		@Override
-		protected List<LocalPropertiesPair> createPossibleLocalProperties() {
-			// all properties are possible
-			return Collections.singletonList(new LocalPropertiesPair(
-				new RequestedLocalProperties(), new RequestedLocalProperties()));
-		}
-		
-		@Override
-		public boolean areCompatible(RequestedGlobalProperties requested1, RequestedGlobalProperties requested2,
-				GlobalProperties produced1, GlobalProperties produced2) {
-			return true;
-		}
-		
-		@Override
-		public boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2,
-				LocalProperties produced1, LocalProperties produced2) {
-			return true;
-		}
-
-		@Override
-		public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) {
-			throw new UnsupportedOperationException();
-		}
-
-		@Override
-		public GlobalProperties computeGlobalProperties(GlobalProperties in1, GlobalProperties in2) {
-			throw new UnsupportedOperationException();
-		}
-
-		@Override
-		public LocalProperties computeLocalProperties(LocalProperties in1, LocalProperties in2) {
-			throw new UnsupportedOperationException();
-		}
-	}
-	
-	public static class SingleRootJoiner extends TwoInputNode {
-		
-		SingleRootJoiner() {
-			super(new NoOpBinaryUdfOp<Nothing>(new NothingTypeInfo()));
-			
-			setDegreeOfParallelism(1);
-		}
-		
-		public void setInputs(DagConnection input1, DagConnection input2) {
-			this.input1 = input1;
-			this.input2 = input2;
-		}
-		
-		@Override
-		public String getName() {
-			return "Internal Utility Node";
-		}
-
-		@Override
-		protected List<OperatorDescriptorDual> getPossibleProperties() {
-			return Collections.emptyList();
-		}
-
-		@Override
-		protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
-			// no estimates are needed here
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/WorksetNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/WorksetNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/WorksetNode.java
deleted file mode 100644
index 3b05aba..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/WorksetNode.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.dag;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.common.operators.base.DeltaIterationBase.WorksetPlaceHolder;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.PlanNode;
-import org.apache.flink.optimizer.plan.WorksetPlanNode;
-
-/**
- * The optimizer's internal representation of the partial solution that is input to a bulk iteration.
- */
-public class WorksetNode extends AbstractPartialSolutionNode {
-	
-	private final WorksetIterationNode iterationNode;
-	
-	
-	public WorksetNode(WorksetPlaceHolder<?> psph, WorksetIterationNode iterationNode) {
-		super(psph);
-		this.iterationNode = iterationNode;
-	}
-
-	// --------------------------------------------------------------------------------------------
-	
-	public void setCandidateProperties(GlobalProperties gProps, LocalProperties lProps, Channel initialInput) {
-		if (this.cachedPlans != null) {
-			throw new IllegalStateException();
-		} else {
-			WorksetPlanNode wspn = new WorksetPlanNode(this, "Workset ("+this.getOperator().getName()+")", gProps, lProps, initialInput);
-			this.cachedPlans = Collections.<PlanNode>singletonList(wspn);
-		}
-	}
-	
-	public WorksetPlanNode getCurrentWorksetPlanNode() {
-		if (this.cachedPlans != null) {
-			return (WorksetPlanNode) this.cachedPlans.get(0);
-		} else {
-			throw new IllegalStateException();
-		}
-	}
-	
-	public WorksetIterationNode getIterationNode() {
-		return this.iterationNode;
-	}
-	
-	@Override
-	public void computeOutputEstimates(DataStatistics statistics) {
-		copyEstimates(this.iterationNode.getInitialWorksetPredecessorNode());
-	}
-	
-	// --------------------------------------------------------------------------------------------
-
-	/**
-	 * Gets the contract object for this data source node.
-	 * 
-	 * @return The contract.
-	 */
-	@Override
-	public WorksetPlaceHolder<?> getOperator() {
-		return (WorksetPlaceHolder<?>) super.getOperator();
-	}
-
-	@Override
-	public String getName() {
-		return "Workset";
-	}
-	
-	@Override
-	public void computeUnclosedBranchStack() {
-		if (this.openBranches != null) {
-			return;
-		}
-
-		DagConnection worksetInput = this.iterationNode.getSecondIncomingConnection();
-		OptimizerNode worksetSource = worksetInput.getSource();
-		
-		addClosedBranches(worksetSource.closedBranchingNodes);
-		List<UnclosedBranchDescriptor> fromInput = worksetSource.getBranchesForParent(worksetInput);
-		this.openBranches = (fromInput == null || fromInput.isEmpty()) ? Collections.<UnclosedBranchDescriptor>emptyList() : fromInput;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java
deleted file mode 100644
index 57ba29d..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java
+++ /dev/null
@@ -1,500 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dataproperties;
-
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.flink.api.common.ExecutionMode;
-import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.common.operators.Ordering;
-import org.apache.flink.api.common.operators.SemanticProperties;
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.common.operators.util.FieldSet;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.util.Utils;
-import org.apache.flink.runtime.io.network.DataExchangeMode;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class represents global properties of the data at a certain point in the plan.
- * Global properties are properties that describe data across different partitions, such as
- * whether the data is hash partitioned, range partitioned, replicated, etc.
- */
-public class GlobalProperties implements Cloneable {
-
-	public static final Logger LOG = LoggerFactory.getLogger(GlobalProperties.class);
-	
-	private PartitioningProperty partitioning;	// the type partitioning
-	
-	private FieldList partitioningFields;		// the fields which are partitioned
-	
-	private Ordering ordering;					// order of the partitioned fields, if it is an ordered (range) range partitioning
-	
-	private Set<FieldSet> uniqueFieldCombinations;
-	
-	private Partitioner<?> customPartitioner;
-	
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * Initializes the global properties with no partitioning.
-	 */
-	public GlobalProperties() {
-		this.partitioning = PartitioningProperty.RANDOM_PARTITIONED;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * Sets this global properties to represent a hash partitioning.
-	 * 
-	 * @param partitionedFields The key fields on which the data is hash partitioned.
-	 */
-	public void setHashPartitioned(FieldList partitionedFields) {
-		if (partitionedFields == null) {
-			throw new NullPointerException();
-		}
-		
-		this.partitioning = PartitioningProperty.HASH_PARTITIONED;
-		this.partitioningFields = partitionedFields;
-		this.ordering = null;
-	}
-	
-
-	public void setRangePartitioned(Ordering ordering) {
-		if (ordering == null) {
-			throw new NullPointerException();
-		}
-		
-		this.partitioning = PartitioningProperty.RANGE_PARTITIONED;
-		this.ordering = ordering;
-		this.partitioningFields = ordering.getInvolvedIndexes();
-	}
-	
-	public void setAnyPartitioning(FieldList partitionedFields) {
-		if (partitionedFields == null) {
-			throw new NullPointerException();
-		}
-		
-		this.partitioning = PartitioningProperty.ANY_PARTITIONING;
-		this.partitioningFields = partitionedFields;
-		this.ordering = null;
-	}
-	
-	public void setRandomPartitioned() {
-		this.partitioning = PartitioningProperty.RANDOM_PARTITIONED;
-		this.partitioningFields = null;
-		this.ordering = null;
-	}
-	
-	public void setFullyReplicated() {
-		this.partitioning = PartitioningProperty.FULL_REPLICATION;
-		this.partitioningFields = null;
-		this.ordering = null;
-	}
-	
-	public void setForcedRebalanced() {
-		this.partitioning = PartitioningProperty.FORCED_REBALANCED;
-		this.partitioningFields = null;
-		this.ordering = null;
-	}
-	
-	public void setCustomPartitioned(FieldList partitionedFields, Partitioner<?> partitioner) {
-		if (partitionedFields == null || partitioner == null) {
-			throw new NullPointerException();
-		}
-		
-		this.partitioning = PartitioningProperty.CUSTOM_PARTITIONING;
-		this.partitioningFields = partitionedFields;
-		this.ordering = null;
-		this.customPartitioner = partitioner;
-	}
-	
-	public void addUniqueFieldCombination(FieldSet fields) {
-		if (fields == null) {
-			return;
-		}
-		if (this.uniqueFieldCombinations == null) {
-			this.uniqueFieldCombinations = new HashSet<FieldSet>();
-		}
-		this.uniqueFieldCombinations.add(fields);
-	}
-	
-	public void clearUniqueFieldCombinations() {
-		if (this.uniqueFieldCombinations != null) {
-			this.uniqueFieldCombinations = null;
-		}
-	}
-	
-	public Set<FieldSet> getUniqueFieldCombination() {
-		return this.uniqueFieldCombinations;
-	}
-	
-	public FieldList getPartitioningFields() {
-		return this.partitioningFields;
-	}
-	
-	public Ordering getPartitioningOrdering() {
-		return this.ordering;
-	}
-	
-	public PartitioningProperty getPartitioning() {
-		return this.partitioning;
-	}
-	
-	public Partitioner<?> getCustomPartitioner() {
-		return this.customPartitioner;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	public boolean isPartitionedOnFields(FieldSet fields) {
-		if (this.partitioning.isPartitionedOnKey() && fields.isValidSubset(this.partitioningFields)) {
-			return true;
-		} else if (this.uniqueFieldCombinations != null) {
-			for (FieldSet set : this.uniqueFieldCombinations) {
-				if (fields.isValidSubset(set)) {
-					return true;
-				}
-			}
-			return false;
-		} else {
-			return false;
-		}
-	}
-
-	public boolean isExactlyPartitionedOnFields(FieldList fields) {
-		return this.partitioning.isPartitionedOnKey() && fields.isExactMatch(this.partitioningFields);
-	}
-	
-	public boolean matchesOrderedPartitioning(Ordering o) {
-		if (this.partitioning == PartitioningProperty.RANGE_PARTITIONED) {
-			if (this.ordering.getNumberOfFields() > o.getNumberOfFields()) {
-				return false;
-			}
-			
-			for (int i = 0; i < this.ordering.getNumberOfFields(); i++) {
-				if (this.ordering.getFieldNumber(i) != o.getFieldNumber(i)) {
-					return false;
-				}
-				
-				// if this one request no order, everything is good
-				final Order oo = o.getOrder(i);
-				final Order to = this.ordering.getOrder(i);
-				if (oo != Order.NONE) {
-					if (oo == Order.ANY) {
-						// if any order is requested, any not NONE order is good
-						if (to == Order.NONE) {
-							return false;
-						}
-					} else if (oo != to) {
-						// the orders must be equal
-						return false;
-					}
-				}
-			}
-			return true;
-		} else {
-			return false;
-		}
-	}
-
-	public boolean isFullyReplicated() {
-		return this.partitioning == PartitioningProperty.FULL_REPLICATION;
-	}
-
-	/**
-	 * Checks, if the properties in this object are trivial, i.e. only standard values.
-	 */
-	public boolean isTrivial() {
-		return partitioning == PartitioningProperty.RANDOM_PARTITIONED;
-	}
-
-	/**
-	 * This method resets the properties to a state where no properties are given.
-	 */
-	public void reset() {
-		this.partitioning = PartitioningProperty.RANDOM_PARTITIONED;
-		this.ordering = null;
-		this.partitioningFields = null;
-	}
-
-	/**
-	 * Filters these GlobalProperties by the fields that are forwarded to the output
-	 * as described by the SemanticProperties.
-	 *
-	 * @param props The semantic properties holding information about forwarded fields.
-	 * @param input The index of the input.
-	 * @return The filtered GlobalProperties
-	 */
-	public GlobalProperties filterBySemanticProperties(SemanticProperties props, int input) {
-
-		if (props == null) {
-			throw new NullPointerException("SemanticProperties may not be null.");
-		}
-
-		GlobalProperties gp = new GlobalProperties();
-
-		// filter partitioning
-		switch(this.partitioning) {
-			case RANGE_PARTITIONED:
-				// check if ordering is preserved
-				Ordering newOrdering = new Ordering();
-				for (int i = 0; i < this.ordering.getInvolvedIndexes().size(); i++) {
-					int sourceField = this.ordering.getInvolvedIndexes().get(i);
-					FieldSet targetField = props.getForwardingTargetFields(input, sourceField);
-
-					if (targetField == null || targetField.size() == 0) {
-						// partitioning is destroyed
-						newOrdering = null;
-						break;
-					} else {
-						// use any field of target fields for now. We should use something like field equivalence sets in the future.
-						if(targetField.size() > 1) {
-							LOG.warn("Found that a field is forwarded to more than one target field in " +
-									"semantic forwarded field information. Will only use the field with the lowest index.");
-						}
-						newOrdering.appendOrdering(targetField.toArray()[0], this.ordering.getType(i), this.ordering.getOrder(i));
-					}
-				}
-				if(newOrdering != null) {
-					gp.partitioning = PartitioningProperty.RANGE_PARTITIONED;
-					gp.ordering = newOrdering;
-					gp.partitioningFields = newOrdering.getInvolvedIndexes();
-				}
-				break;
-			case HASH_PARTITIONED:
-			case ANY_PARTITIONING:
-			case CUSTOM_PARTITIONING:
-				FieldList newPartitioningFields = new FieldList();
-				for (int sourceField : this.partitioningFields) {
-					FieldSet targetField = props.getForwardingTargetFields(input, sourceField);
-
-					if (targetField == null || targetField.size() == 0) {
-						newPartitioningFields = null;
-						break;
-					} else {
-						// use any field of target fields for now.  We should use something like field equivalence sets in the future.
-						if(targetField.size() > 1) {
-							LOG.warn("Found that a field is forwarded to more than one target field in " +
-									"semantic forwarded field information. Will only use the field with the lowest index.");
-						}
-						newPartitioningFields = newPartitioningFields.addField(targetField.toArray()[0]);
-					}
-				}
-				if(newPartitioningFields != null) {
-					gp.partitioning = this.partitioning;
-					gp.partitioningFields = newPartitioningFields;
-					gp.customPartitioner = this.customPartitioner;
-				}
-				break;
-			case FORCED_REBALANCED:
-			case FULL_REPLICATION:
-			case RANDOM_PARTITIONED:
-				gp.partitioning = this.partitioning;
-				break;
-			default:
-				throw new RuntimeException("Unknown partitioning type.");
-		}
-
-		// filter unique field combinations
-		if (this.uniqueFieldCombinations != null) {
-			Set<FieldSet> newUniqueFieldCombinations = new HashSet<FieldSet>();
-			for (FieldSet fieldCombo : this.uniqueFieldCombinations) {
-				FieldSet newFieldCombo = new FieldSet();
-				for (Integer sourceField : fieldCombo) {
-					FieldSet targetField = props.getForwardingTargetFields(input, sourceField);
-
-					if (targetField == null || targetField.size() == 0) {
-						newFieldCombo = null;
-						break;
-					} else {
-						// use any field of target fields for now.  We should use something like field equivalence sets in the future.
-						if(targetField.size() > 1) {
-							LOG.warn("Found that a field is forwarded to more than one target field in " +
-									"semantic forwarded field information. Will only use the field with the lowest index.");
-						}
-						newFieldCombo = newFieldCombo.addField(targetField.toArray()[0]);
-					}
-				}
-				if (newFieldCombo != null) {
-					newUniqueFieldCombinations.add(newFieldCombo);
-				}
-			}
-			if(!newUniqueFieldCombinations.isEmpty()) {
-				gp.uniqueFieldCombinations = newUniqueFieldCombinations;
-			}
-		}
-
-		return gp;
-	}
-
-
-	public void parameterizeChannel(Channel channel, boolean globalDopChange,
-									ExecutionMode exchangeMode, boolean breakPipeline) {
-
-		ShipStrategyType shipType;
-		FieldList partitionKeys;
-		boolean[] sortDirection;
-		Partitioner<?> partitioner;
-
-		switch (this.partitioning) {
-			case RANDOM_PARTITIONED:
-				shipType = globalDopChange ? ShipStrategyType.PARTITION_RANDOM : ShipStrategyType.FORWARD;
-				partitionKeys = null;
-				sortDirection = null;
-				partitioner = null;
-				break;
-
-			case FULL_REPLICATION:
-				shipType = ShipStrategyType.BROADCAST;
-				partitionKeys = null;
-				sortDirection = null;
-				partitioner = null;
-				break;
-
-			case ANY_PARTITIONING:
-			case HASH_PARTITIONED:
-				shipType = ShipStrategyType.PARTITION_HASH;
-				partitionKeys = Utils.createOrderedFromSet(this.partitioningFields);
-				sortDirection = null;
-				partitioner = null;
-				break;
-
-			case RANGE_PARTITIONED:
-				shipType = ShipStrategyType.PARTITION_RANGE;
-				partitionKeys = this.ordering.getInvolvedIndexes();
-				sortDirection = this.ordering.getFieldSortDirections();
-				partitioner = null;
-				break;
-
-			case FORCED_REBALANCED:
-				shipType = ShipStrategyType.PARTITION_RANDOM;
-				partitionKeys = null;
-				sortDirection = null;
-				partitioner = null;
-				break;
-
-			case CUSTOM_PARTITIONING:
-				shipType = ShipStrategyType.PARTITION_CUSTOM;
-				partitionKeys = this.partitioningFields;
-				sortDirection = null;
-				partitioner = this.customPartitioner;
-				break;
-
-			default:
-				throw new CompilerException("Unsupported partitioning strategy");
-		}
-
-		DataExchangeMode exMode = DataExchangeMode.select(exchangeMode, shipType, breakPipeline);
-		channel.setShipStrategy(shipType, partitionKeys, sortDirection, partitioner, exMode);
-	}
-
-	// ------------------------------------------------------------------------
-
-	@Override
-	public int hashCode() {
-		final int prime = 31;
-		int result = 1;
-		result = prime * result + ((partitioning == null) ? 0 : partitioning.ordinal());
-		result = prime * result + ((partitioningFields == null) ? 0 : partitioningFields.hashCode());
-		result = prime * result + ((ordering == null) ? 0 : ordering.hashCode());
-		return result;
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (obj != null && obj instanceof GlobalProperties) {
-			final GlobalProperties other = (GlobalProperties) obj;
-			return (this.partitioning == other.partitioning)
-				&& (this.ordering == other.ordering || (this.ordering != null && this.ordering.equals(other.ordering)))
-				&& (this.partitioningFields == other.partitioningFields || 
-							(this.partitioningFields != null && this.partitioningFields.equals(other.partitioningFields)))
-				&& (this.uniqueFieldCombinations == other.uniqueFieldCombinations || 
-							(this.uniqueFieldCombinations != null && this.uniqueFieldCombinations.equals(other.uniqueFieldCombinations)));
-		} else {
-			return false;
-		}
-	}
-
-	@Override
-	public String toString() {
-		final StringBuilder bld = new StringBuilder(
-			"GlobalProperties [partitioning=" + partitioning + 
-			(this.partitioningFields == null ? "" : ", on fields " + this.partitioningFields) + 
-			(this.ordering == null ? "" : ", with ordering " + this.ordering));
-		
-		if (this.uniqueFieldCombinations == null) {
-			bld.append(']');
-		} else {
-			bld.append(" - Unique field groups: ");
-			bld.append(this.uniqueFieldCombinations);
-			bld.append(']');
-		}
-		return bld.toString();
-	}
-
-	@Override
-	public GlobalProperties clone() {
-		final GlobalProperties newProps = new GlobalProperties();
-		newProps.partitioning = this.partitioning;
-		newProps.partitioningFields = this.partitioningFields;
-		newProps.ordering = this.ordering;
-		newProps.customPartitioner = this.customPartitioner;
-		newProps.uniqueFieldCombinations = this.uniqueFieldCombinations == null ? null : new HashSet<FieldSet>(this.uniqueFieldCombinations);
-		return newProps;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	public static GlobalProperties combine(GlobalProperties gp1, GlobalProperties gp2) {
-		if (gp1.isFullyReplicated()) {
-			if (gp2.isFullyReplicated()) {
-				return new GlobalProperties();
-			} else {
-				return gp2;
-			}
-		} else if (gp2.isFullyReplicated()) {
-			return gp1;
-		} else if (gp1.ordering != null) {
-			return gp1;
-		} else if (gp2.ordering != null) {
-			return gp2;
-		} else if (gp1.partitioningFields != null) {
-			return gp1;
-		} else if (gp2.partitioningFields != null) {
-			return gp2;
-		} else if (gp1.uniqueFieldCombinations != null) {
-			return gp1;
-		} else if (gp2.uniqueFieldCombinations != null) {
-			return gp2;
-		} else if (gp1.getPartitioning().isPartitioned()) {
-			return gp1;
-		} else if (gp2.getPartitioning().isPartitioned()) {
-			return gp2;
-		} else {
-			return gp1;
-		}
-	}
-}