You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/20 11:07:16 UTC

[37/53] [abbrv] flink git commit: [optimizer] Rename optimizer project to "flink-optimizer" (previously flink-compiler)

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/OptimizerNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/OptimizerNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/OptimizerNode.java
deleted file mode 100644
index 0cad34e..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/OptimizerNode.java
+++ /dev/null
@@ -1,1172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dag;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.flink.api.common.ExecutionMode;
-import org.apache.flink.api.common.operators.AbstractUdfOperator;
-import org.apache.flink.api.common.operators.CompilerHints;
-import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.common.operators.SemanticProperties;
-import org.apache.flink.api.common.operators.util.FieldSet;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.costs.CostEstimator;
-import org.apache.flink.optimizer.dataproperties.InterestingProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
-import org.apache.flink.optimizer.plan.PlanNode;
-import org.apache.flink.optimizer.plandump.DumpableConnection;
-import org.apache.flink.optimizer.plandump.DumpableNode;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.util.Visitable;
-import org.apache.flink.util.Visitor;
-
-/**
- * The OptimizerNode is the base class of all nodes in the optimizer DAG. The optimizer DAG is the
- * optimizer's representation of a program, created before the actual optimization (which creates different
- * candidate plans and computes their cost).
- * <p>>
- * Nodes in the DAG correspond (almost) one-to-one to the operators in a program. The optimizer DAG is constructed
- * to hold the additional information that the optimizer needs:
- * <ul>
- *     <li>Estimates of the data size processed by each operator</li>
- *     <li>Helper structures to track where the data flow "splits" and "joins", to support flows that are
- *         DAGs but not trees.</li>
- *     <li>Tags and weights to differentiate between loop-variant and -invariant parts of an iteration</li>
- *     <li>Interesting properties to be used during the enumeration of candidate plans</li>
- * </ul>
- */
-public abstract class OptimizerNode implements Visitable<OptimizerNode>, EstimateProvider, DumpableNode<OptimizerNode> {
-	
-	public static final int MAX_DYNAMIC_PATH_COST_WEIGHT = 100;
-	
-	// --------------------------------------------------------------------------------------------
-	//                                      Members
-	// --------------------------------------------------------------------------------------------
-
-	private final Operator<?> operator; // The operator (Reduce / Join / DataSource / ...)
-	
-	private List<String> broadcastConnectionNames = new ArrayList<String>(); // the broadcast inputs names of this node
-	
-	private List<DagConnection> broadcastConnections = new ArrayList<DagConnection>(); // the broadcast inputs of this node
-	
-	private List<DagConnection> outgoingConnections; // The links to succeeding nodes
-
-	private InterestingProperties intProps; // the interesting properties of this node
-	
-	// --------------------------------- Branch Handling ------------------------------------------
-
-	protected List<UnclosedBranchDescriptor> openBranches; // stack of branches in the sub-graph that are not joined
-	
-	protected Set<OptimizerNode> closedBranchingNodes; 	// stack of branching nodes which have already been closed
-	
-	protected List<OptimizerNode> hereJoinedBranches;	// the branching nodes (node with multiple outputs)
-	// 										that are partially joined (through multiple inputs or broadcast vars)
-
-	// ---------------------------- Estimates and Annotations -------------------------------------
-	
-	protected long estimatedOutputSize = -1; // the estimated size of the output (bytes)
-
-	protected long estimatedNumRecords = -1; // the estimated number of key/value pairs in the output
-	
-	protected Set<FieldSet> uniqueFields; // set of attributes that will always be unique after this node
-
-	// --------------------------------- General Parameters ---------------------------------------
-	
-	private int parallelism = -1; // the number of parallel instances of this node
-	
-	private long minimalMemoryPerSubTask = -1;
-
-	protected int id = -1; 				// the id for this node.
-	
-	protected int costWeight = 1;		// factor to weight the costs for dynamic paths
-	
-	protected boolean onDynamicPath;
-	
-	protected List<PlanNode> cachedPlans;	// cache candidates, because the may be accessed repeatedly
-
-	// ------------------------------------------------------------------------
-	//                      Constructor / Setup
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Creates a new optimizer node that represents the given program operator.
-	 * 
-	 * @param op The operator that the node represents.
-	 */
-	public OptimizerNode(Operator<?> op) {
-		this.operator = op;
-		readStubAnnotations();
-	}
-	
-	protected OptimizerNode(OptimizerNode toCopy) {
-		this.operator = toCopy.operator;
-		this.intProps = toCopy.intProps;
-		
-		this.openBranches = toCopy.openBranches;
-		this.closedBranchingNodes = toCopy.closedBranchingNodes;
-		
-		this.estimatedOutputSize = toCopy.estimatedOutputSize;
-		this.estimatedNumRecords = toCopy.estimatedNumRecords;
-		
-		this.parallelism = toCopy.parallelism;
-		this.minimalMemoryPerSubTask = toCopy.minimalMemoryPerSubTask;
-		
-		this.id = toCopy.id;
-		this.costWeight = toCopy.costWeight;
-		this.onDynamicPath = toCopy.onDynamicPath;
-	}
-
-	// ------------------------------------------------------------------------
-	//  Methods specific to unary- / binary- / special nodes
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Gets the name of this node, which is the name of the function/operator, or
-	 * data source / data sink.
-	 * 
-	 * @return The node name.
-	 */
-	public abstract String getName();
-
-	/**
-	 * This function connects the predecessors to this operator.
-	 *
-	 * @param operatorToNode The map from program operators to optimizer nodes.
-	 * @param defaultExchangeMode The data exchange mode to use, if the operator does not
-	 *                            specify one.
-	 */
-	public abstract void setInput(Map<Operator<?>, OptimizerNode> operatorToNode,
-									ExecutionMode defaultExchangeMode);
-
-	/**
-	 * This function connects the operators that produce the broadcast inputs to this operator.
-	 *
-	 * @param operatorToNode The map from program operators to optimizer nodes.
-	 * @param defaultExchangeMode The data exchange mode to use, if the operator does not
-	 *                            specify one.
-	 *
-	 * @throws CompilerException
-	 */
-	public void setBroadcastInputs(Map<Operator<?>, OptimizerNode> operatorToNode, ExecutionMode defaultExchangeMode) {
-		// skip for Operators that don't support broadcast variables 
-		if (!(getOperator() instanceof AbstractUdfOperator<?, ?>)) {
-			return;
-		}
-
-		// get all broadcast inputs
-		AbstractUdfOperator<?, ?> operator = ((AbstractUdfOperator<?, ?>) getOperator());
-
-		// create connections and add them
-		for (Map.Entry<String, Operator<?>> input : operator.getBroadcastInputs().entrySet()) {
-			OptimizerNode predecessor = operatorToNode.get(input.getValue());
-			DagConnection connection = new DagConnection(predecessor, this,
-															ShipStrategyType.BROADCAST, defaultExchangeMode);
-			addBroadcastConnection(input.getKey(), connection);
-			predecessor.addOutgoingConnection(connection);
-		}
-	}
-
-	/**
-	 * Gets all incoming connections of this node.
-	 * This method needs to be overridden by subclasses to return the children.
-	 * 
-	 * @return The list of incoming connections.
-	 */
-	public abstract List<DagConnection> getIncomingConnections();
-
-	/**
-	 * Tells the node to compute the interesting properties for its inputs. The interesting properties
-	 * for the node itself must have been computed before.
-	 * The node must then see how many of interesting properties it preserves and add its own.
-	 * 
-	 * @param estimator The {@code CostEstimator} instance to use for plan cost estimation.
-	 */
-	public abstract void computeInterestingPropertiesForInputs(CostEstimator estimator);
-
-	/**
-	 * This method causes the node to compute the description of open branches in its sub-plan. An open branch
-	 * describes, that a (transitive) child node had multiple outputs, which have not all been re-joined in the
-	 * sub-plan. This method needs to set the <code>openBranches</code> field to a stack of unclosed branches, the
-	 * latest one top. A branch is considered closed, if some later node sees all of the branching node's outputs,
-	 * no matter if there have been more branches to different paths in the meantime.
-	 */
-	public abstract void computeUnclosedBranchStack();
-	
-	
-	protected List<UnclosedBranchDescriptor> computeUnclosedBranchStackForBroadcastInputs(
-															List<UnclosedBranchDescriptor> branchesSoFar)
-	{
-		// handle the data flow branching for the broadcast inputs
-		for (DagConnection broadcastInput : getBroadcastConnections()) {
-			OptimizerNode bcSource = broadcastInput.getSource();
-			addClosedBranches(bcSource.closedBranchingNodes);
-			
-			List<UnclosedBranchDescriptor> bcBranches = bcSource.getBranchesForParent(broadcastInput);
-			
-			ArrayList<UnclosedBranchDescriptor> mergedBranches = new ArrayList<UnclosedBranchDescriptor>();
-			mergeLists(branchesSoFar, bcBranches, mergedBranches, true);
-			branchesSoFar = mergedBranches.isEmpty() ? Collections.<UnclosedBranchDescriptor>emptyList() : mergedBranches;
-		}
-		
-		return branchesSoFar;
-	}
-
-	/**
-	 * Computes the plan alternatives for this node, an implicitly for all nodes that are children of
-	 * this node. This method must determine for each alternative the global and local properties
-	 * and the costs. This method may recursively call <code>getAlternatives()</code> on its children
-	 * to get their plan alternatives, and build its own alternatives on top of those.
-	 * 
-	 * @param estimator
-	 *        The cost estimator used to estimate the costs of each plan alternative.
-	 * @return A list containing all plan alternatives.
-	 */
-	public abstract List<PlanNode> getAlternativePlans(CostEstimator estimator);
-
-	/**
-	 * This method implements the visit of a depth-first graph traversing visitor. Implementers must first
-	 * call the <code>preVisit()</code> method, then hand the visitor to their children, and finally call
-	 * the <code>postVisit()</code> method.
-	 * 
-	 * @param visitor
-	 *        The graph traversing visitor.
-	 * @see org.apache.flink.util.Visitable#accept(org.apache.flink.util.Visitor)
-	 */
-	@Override
-	public abstract void accept(Visitor<OptimizerNode> visitor);
-
-	public abstract SemanticProperties getSemanticProperties();
-
-	// ------------------------------------------------------------------------
-	//                          Getters / Setters
-	// ------------------------------------------------------------------------
-
-	@Override
-	public Iterable<OptimizerNode> getPredecessors() {
-		List<OptimizerNode> allPredecessors = new ArrayList<OptimizerNode>();
-
-		for (DagConnection dagConnection : getIncomingConnections()) {
-			allPredecessors.add(dagConnection.getSource());
-		}
-		
-		for (DagConnection conn : getBroadcastConnections()) {
-			allPredecessors.add(conn.getSource());
-		}
-		
-		return allPredecessors;
-	}
-	
-	/**
-	 * Gets the ID of this node. If the id has not yet been set, this method returns -1;
-	 * 
-	 * @return This node's id, or -1, if not yet set.
-	 */
-	public int getId() {
-		return this.id;
-	}
-
-	/**
-	 * Sets the ID of this node.
-	 * 
-	 * @param id
-	 *        The id for this node.
-	 */
-	public void initId(int id) {
-		if (id <= 0) {
-			throw new IllegalArgumentException();
-		}
-		
-		if (this.id == -1) {
-			this.id = id;
-		} else {
-			throw new IllegalStateException("Id has already been initialized.");
-		}
-	}
-
-	/**
-	 * Adds the broadcast connection identified by the given {@code name} to this node.
-	 * 
-	 * @param broadcastConnection The connection to add.
-	 */
-	public void addBroadcastConnection(String name, DagConnection broadcastConnection) {
-		this.broadcastConnectionNames.add(name);
-		this.broadcastConnections.add(broadcastConnection);
-	}
-
-	/**
-	 * Return the list of names associated with broadcast inputs for this node.
-	 */
-	public List<String> getBroadcastConnectionNames() {
-		return this.broadcastConnectionNames;
-	}
-
-	/**
-	 * Return the list of inputs associated with broadcast variables for this node.
-	 */
-	public List<DagConnection> getBroadcastConnections() {
-		return this.broadcastConnections;
-	}
-
-	/**
-	 * Adds a new outgoing connection to this node.
-	 * 
-	 * @param connection
-	 *        The connection to add.
-	 */
-	public void addOutgoingConnection(DagConnection connection) {
-		if (this.outgoingConnections == null) {
-			this.outgoingConnections = new ArrayList<DagConnection>();
-		} else {
-			if (this.outgoingConnections.size() == 64) {
-				throw new CompilerException("Cannot currently handle nodes with more than 64 outputs.");
-			}
-		}
-
-		this.outgoingConnections.add(connection);
-	}
-
-	/**
-	 * The list of outgoing connections from this node to succeeding tasks.
-	 * 
-	 * @return The list of outgoing connections.
-	 */
-	public List<DagConnection> getOutgoingConnections() {
-		return this.outgoingConnections;
-	}
-
-	/**
-	 * Gets the operator represented by this optimizer node.
-	 * 
-	 * @return This node's operator.
-	 */
-	public Operator<?> getOperator() {
-		return this.operator;
-	}
-
-	/**
-	 * Gets the parallelism for the operator represented by this optimizer node.
-	 * The parallelism denotes how many parallel instances of the operator on will be
-	 * spawned during the execution. If this value is <code>-1</code>, then the system will take
-	 * the default number of parallel instances.
-	 * 
-	 * @return The parallelism of the operator.
-	 */
-	public int getParallelism() {
-		return this.parallelism;
-	}
-
-	/**
-	 * Sets the parallelism for this optimizer node.
-	 * The parallelism denotes how many parallel instances of the operator will be
-	 * spawned during the execution. If this value is set to <code>-1</code>, then the system will take
-	 * the default number of parallel instances.
-	 * 
-	 * @param parallelism The parallelism to set.
-	 * @throws IllegalArgumentException If the parallelism is smaller than one and not -1.
-	 */
-	public void setDegreeOfParallelism(int parallelism) {
-		if (parallelism < 1 && parallelism != -1) {
-			throw new IllegalArgumentException("Degree of parallelism of " + parallelism + " is invalid.");
-		}
-		this.parallelism = parallelism;
-	}
-	
-	/**
-	 * Gets the amount of memory that all subtasks of this task have jointly available.
-	 * 
-	 * @return The total amount of memory across all subtasks.
-	 */
-	public long getMinimalMemoryAcrossAllSubTasks() {
-		return this.minimalMemoryPerSubTask == -1 ? -1 : this.minimalMemoryPerSubTask * this.parallelism;
-	}
-	
-	public boolean isOnDynamicPath() {
-		return this.onDynamicPath;
-	}
-	
-	public void identifyDynamicPath(int costWeight) {
-		boolean anyDynamic = false;
-		boolean allDynamic = true;
-		
-		for (DagConnection conn : getIncomingConnections()) {
-			boolean dynamicIn = conn.isOnDynamicPath();
-			anyDynamic |= dynamicIn;
-			allDynamic &= dynamicIn;
-		}
-		
-		for (DagConnection conn : getBroadcastConnections()) {
-			boolean dynamicIn = conn.isOnDynamicPath();
-			anyDynamic |= dynamicIn;
-			allDynamic &= dynamicIn;
-		}
-		
-		if (anyDynamic) {
-			this.onDynamicPath = true;
-			this.costWeight = costWeight;
-			if (!allDynamic) {
-				// this node joins static and dynamic path.
-				// mark the connections where the source is not dynamic as cached
-				for (DagConnection conn : getIncomingConnections()) {
-					if (!conn.getSource().isOnDynamicPath()) {
-						conn.setMaterializationMode(conn.getMaterializationMode().makeCached());
-					}
-				}
-				
-				// broadcast variables are always cached, because they stay unchanged available in the
-				// runtime context of the functions
-			}
-		}
-	}
-	
-	public int getCostWeight() {
-		return this.costWeight;
-	}
-	
-	public int getMaxDepth() {
-		int maxDepth = 0;
-		for (DagConnection conn : getIncomingConnections()) {
-			maxDepth = Math.max(maxDepth, conn.getMaxDepth());
-		}
-		for (DagConnection conn : getBroadcastConnections()) {
-			maxDepth = Math.max(maxDepth, conn.getMaxDepth());
-		}
-		
-		return maxDepth;
-	}
-
-	/**
-	 * Gets the properties that are interesting for this node to produce.
-	 * 
-	 * @return The interesting properties for this node, or null, if not yet computed.
-	 */
-	public InterestingProperties getInterestingProperties() {
-		return this.intProps;
-	}
-
-	@Override
-	public long getEstimatedOutputSize() {
-		return this.estimatedOutputSize;
-	}
-
-	@Override
-	public long getEstimatedNumRecords() {
-		return this.estimatedNumRecords;
-	}
-	
-	public void setEstimatedOutputSize(long estimatedOutputSize) {
-		this.estimatedOutputSize = estimatedOutputSize;
-	}
-
-	public void setEstimatedNumRecords(long estimatedNumRecords) {
-		this.estimatedNumRecords = estimatedNumRecords;
-	}
-	
-	@Override
-	public float getEstimatedAvgWidthPerOutputRecord() {
-		if (this.estimatedOutputSize > 0 && this.estimatedNumRecords > 0) {
-			return ((float) this.estimatedOutputSize) / this.estimatedNumRecords;
-		} else {
-			return -1.0f;
-		}
-	}
-
-	/**
-	 * Checks whether this node has branching output. A node's output is branched, if it has more
-	 * than one output connection.
-	 * 
-	 * @return True, if the node's output branches. False otherwise.
-	 */
-	public boolean isBranching() {
-		return getOutgoingConnections() != null && getOutgoingConnections().size() > 1;
-	}
-
-	public void markAllOutgoingConnectionsAsPipelineBreaking() {
-		if (this.outgoingConnections == null) {
-			throw new IllegalStateException("The outgoing connections have not yet been initialized.");
-		}
-		for (DagConnection conn : getOutgoingConnections()) {
-			conn.markBreaksPipeline();
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//                              Miscellaneous
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Checks, if all outgoing connections have their interesting properties set from their target nodes.
-	 * 
-	 * @return True, if on all outgoing connections, the interesting properties are set. False otherwise.
-	 */
-	public boolean haveAllOutputConnectionInterestingProperties() {
-		for (DagConnection conn : getOutgoingConnections()) {
-			if (conn.getInterestingProperties() == null) {
-				return false;
-			}
-		}
-		return true;
-	}
-
-	/**
-	 * Computes all the interesting properties that are relevant to this node. The interesting
-	 * properties are a union of the interesting properties on each outgoing connection.
-	 * However, if two interesting properties on the outgoing connections overlap,
-	 * the interesting properties will occur only once in this set. For that, this
-	 * method deduplicates and merges the interesting properties.
-	 * This method returns copies of the original interesting properties objects and
-	 * leaves the original objects, contained by the connections, unchanged.
-	 */
-	public void computeUnionOfInterestingPropertiesFromSuccessors() {
-		List<DagConnection> conns = getOutgoingConnections();
-		if (conns.size() == 0) {
-			// no incoming, we have none ourselves
-			this.intProps = new InterestingProperties();
-		} else {
-			this.intProps = conns.get(0).getInterestingProperties().clone();
-			for (int i = 1; i < conns.size(); i++) {
-				this.intProps.addInterestingProperties(conns.get(i).getInterestingProperties());
-			}
-		}
-		this.intProps.dropTrivials();
-	}
-	
-	public void clearInterestingProperties() {
-		this.intProps = null;
-		for (DagConnection conn : getIncomingConnections()) {
-			conn.clearInterestingProperties();
-		}
-		for (DagConnection conn : getBroadcastConnections()) {
-			conn.clearInterestingProperties();
-		}
-	}
-	
-	/**
-	 * Causes this node to compute its output estimates (such as number of rows, size in bytes)
-	 * based on the inputs and the compiler hints. The compiler hints are instantiated with conservative
-	 * default values which are used if no other values are provided. Nodes may access the statistics to
-	 * determine relevant information.
-	 * 
-	 * @param statistics
-	 *        The statistics object which may be accessed to get statistical information.
-	 *        The parameter may be null, if no statistics are available.
-	 */
-	public void computeOutputEstimates(DataStatistics statistics) {
-		// sanity checking
-		for (DagConnection c : getIncomingConnections()) {
-			if (c.getSource() == null) {
-				throw new CompilerException("Bug: Estimate computation called before inputs have been set.");
-			}
-		}
-		
-		// let every operator do its computation
-		computeOperatorSpecificDefaultEstimates(statistics);
-		
-		if (this.estimatedOutputSize < 0) {
-			this.estimatedOutputSize = -1;
-		}
-		if (this.estimatedNumRecords < 0) {
-			this.estimatedNumRecords = -1;
-		}
-		
-		// overwrite default estimates with hints, if given
-		if (getOperator() == null || getOperator().getCompilerHints() == null) {
-			return ;
-		}
-		
-		CompilerHints hints = getOperator().getCompilerHints();
-		if (hints.getOutputSize() >= 0) {
-			this.estimatedOutputSize = hints.getOutputSize();
-		}
-		
-		if (hints.getOutputCardinality() >= 0) {
-			this.estimatedNumRecords = hints.getOutputCardinality();
-		}
-		
-		if (hints.getFilterFactor() >= 0.0f) {
-			if (this.estimatedNumRecords >= 0) {
-				this.estimatedNumRecords = (long) (this.estimatedNumRecords * hints.getFilterFactor());
-				
-				if (this.estimatedOutputSize >= 0) {
-					this.estimatedOutputSize = (long) (this.estimatedOutputSize * hints.getFilterFactor());
-				}
-			}
-			else if (this instanceof SingleInputNode) {
-				OptimizerNode pred = ((SingleInputNode) this).getPredecessorNode();
-				if (pred != null && pred.getEstimatedNumRecords() >= 0) {
-					this.estimatedNumRecords = (long) (pred.getEstimatedNumRecords() * hints.getFilterFactor());
-				}
-			}
-		}
-		
-		// use the width to infer the cardinality (given size) and vice versa
-		if (hints.getAvgOutputRecordSize() >= 1) {
-			// the estimated number of rows based on size
-			if (this.estimatedNumRecords == -1 && this.estimatedOutputSize >= 0) {
-				this.estimatedNumRecords = (long) (this.estimatedOutputSize / hints.getAvgOutputRecordSize());
-			}
-			else if (this.estimatedOutputSize == -1 && this.estimatedNumRecords >= 0) {
-				this.estimatedOutputSize = (long) (this.estimatedNumRecords * hints.getAvgOutputRecordSize());
-			}
-		}
-	}
-	
-	protected abstract void computeOperatorSpecificDefaultEstimates(DataStatistics statistics);
-	
-	// ------------------------------------------------------------------------
-	// Reading of stub annotations
-	// ------------------------------------------------------------------------
-	
-	/**
-	 * Reads all stub annotations, i.e. which fields remain constant, what cardinality bounds the
-	 * functions have, which fields remain unique.
-	 */
-	protected void readStubAnnotations() {
-		readUniqueFieldsAnnotation();
-	}
-	
-	protected void readUniqueFieldsAnnotation() {
-		if (this.operator.getCompilerHints() != null) {
-			Set<FieldSet> uniqueFieldSets = operator.getCompilerHints().getUniqueFields();
-			if (uniqueFieldSets != null) {
-				if (this.uniqueFields == null) {
-					this.uniqueFields = new HashSet<FieldSet>();
-				}
-				this.uniqueFields.addAll(uniqueFieldSets);
-			}
-		}
-	}
-	
-	// ------------------------------------------------------------------------
-	// Access of stub annotations
-	// ------------------------------------------------------------------------
-	
-	/**
-	 * Gets the FieldSets which are unique in the output of the node. 
-	 */
-	public Set<FieldSet> getUniqueFields() {
-		return this.uniqueFields == null ? Collections.<FieldSet>emptySet() : this.uniqueFields;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//                                    Pruning
-	// --------------------------------------------------------------------------------------------
-	
-	protected void prunePlanAlternatives(List<PlanNode> plans) {
-		if (plans.isEmpty()) {
-			throw new CompilerException("No plan meeting the requirements could be created @ " + this + ". Most likely reason: Too restrictive plan hints.");
-		}
-		// shortcut for the simple case
-		if (plans.size() == 1) {
-			return;
-		}
-		
-		// we can only compare plan candidates that made equal choices
-		// at the branching points. for each choice at a branching point,
-		// we need to keep the cheapest (wrt. interesting properties).
-		// if we do not keep candidates for each branch choice, we might not
-		// find branch compatible candidates when joining the branches back.
-		
-		// for pruning, we are quasi AFTER the node, so in the presence of
-		// branches, we need form the per-branch-choice groups by the choice
-		// they made at the latest un-joined branching node. Note that this is
-		// different from the check for branch compatibility of candidates, as
-		// this happens on the input sub-plans and hence BEFORE the node (therefore
-		// it is relevant to find the latest (partially) joined branch point.
-		
-		if (this.openBranches == null || this.openBranches.isEmpty()) {
-			prunePlanAlternativesWithCommonBranching(plans);
-		} else {
-			// partition the candidates into groups that made the same sub-plan candidate
-			// choice at the latest unclosed branch point
-			
-			final OptimizerNode[] branchDeterminers = new OptimizerNode[this.openBranches.size()];
-			
-			for (int i = 0; i < branchDeterminers.length; i++) {
-				branchDeterminers[i] = this.openBranches.get(this.openBranches.size() - 1 - i).getBranchingNode();
-			}
-			
-			// this sorter sorts by the candidate choice at the branch point
-			Comparator<PlanNode> sorter = new Comparator<PlanNode>() {
-				
-				@Override
-				public int compare(PlanNode o1, PlanNode o2) {
-					for (OptimizerNode branchDeterminer : branchDeterminers) {
-						PlanNode n1 = o1.getCandidateAtBranchPoint(branchDeterminer);
-						PlanNode n2 = o2.getCandidateAtBranchPoint(branchDeterminer);
-						int hash1 = System.identityHashCode(n1);
-						int hash2 = System.identityHashCode(n2);
-
-						if (hash1 != hash2) {
-							return hash1 - hash2;
-						}
-					}
-					return 0;
-				}
-			};
-			Collections.sort(plans, sorter);
-			
-			List<PlanNode> result = new ArrayList<PlanNode>();
-			List<PlanNode> turn = new ArrayList<PlanNode>();
-			
-			final PlanNode[] determinerChoice = new PlanNode[branchDeterminers.length];
-
-			while (!plans.isEmpty()) {
-				// take one as the determiner
-				turn.clear();
-				PlanNode determiner = plans.remove(plans.size() - 1);
-				turn.add(determiner);
-				
-				for (int i = 0; i < determinerChoice.length; i++) {
-					determinerChoice[i] = determiner.getCandidateAtBranchPoint(branchDeterminers[i]);
-				}
-
-				// go backwards through the plans and find all that are equal
-				boolean stillEqual = true;
-				for (int k = plans.size() - 1; k >= 0 && stillEqual; k--) {
-					PlanNode toCheck = plans.get(k);
-					
-					for (int i = 0; i < branchDeterminers.length; i++) {
-						PlanNode checkerChoice = toCheck.getCandidateAtBranchPoint(branchDeterminers[i]);
-					
-						if (checkerChoice != determinerChoice[i]) {
-							// not the same anymore
-							stillEqual = false;
-							break;
-						}
-					}
-					
-					if (stillEqual) {
-						// the same
-						plans.remove(k);
-						turn.add(toCheck);
-					}
-				}
-
-				// now that we have only plans with the same branch alternatives, prune!
-				if (turn.size() > 1) {
-					prunePlanAlternativesWithCommonBranching(turn);
-				}
-				result.addAll(turn);
-			}
-
-			// after all turns are complete
-			plans.clear();
-			plans.addAll(result);
-		}
-	}
-	
-	protected void prunePlanAlternativesWithCommonBranching(List<PlanNode> plans) {
-		// for each interesting property, which plans are cheapest
-		final RequestedGlobalProperties[] gps = this.intProps.getGlobalProperties().toArray(
-							new RequestedGlobalProperties[this.intProps.getGlobalProperties().size()]);
-		final RequestedLocalProperties[] lps = this.intProps.getLocalProperties().toArray(
-							new RequestedLocalProperties[this.intProps.getLocalProperties().size()]);
-		
-		final PlanNode[][] toKeep = new PlanNode[gps.length][];
-		final PlanNode[] cheapestForGlobal = new PlanNode[gps.length];
-		
-		
-		PlanNode cheapest = null; // the overall cheapest plan
-
-		// go over all plans from the list
-		for (PlanNode candidate : plans) {
-			// check if that plan is the overall cheapest
-			if (cheapest == null || (cheapest.getCumulativeCosts().compareTo(candidate.getCumulativeCosts()) > 0)) {
-				cheapest = candidate;
-			}
-
-			// find the interesting global properties that this plan matches
-			for (int i = 0; i < gps.length; i++) {
-				if (gps[i].isMetBy(candidate.getGlobalProperties())) {
-					// the candidate meets the global property requirements. That means
-					// it has a chance that its local properties are re-used (they would be
-					// destroyed if global properties need to be established)
-					
-					if (cheapestForGlobal[i] == null || (cheapestForGlobal[i].getCumulativeCosts().compareTo(candidate.getCumulativeCosts()) > 0)) {
-						cheapestForGlobal[i] = candidate;
-					}
-					
-					final PlanNode[] localMatches;
-					if (toKeep[i] == null) {
-						localMatches = new PlanNode[lps.length];
-						toKeep[i] = localMatches;
-					} else {
-						localMatches = toKeep[i];
-					}
-					
-					for (int k = 0; k < lps.length; k++) {
-						if (lps[k].isMetBy(candidate.getLocalProperties())) {
-							final PlanNode previous = localMatches[k];
-							if (previous == null || previous.getCumulativeCosts().compareTo(candidate.getCumulativeCosts()) > 0) {
-								// this one is cheaper!
-								localMatches[k] = candidate;
-							}
-						}
-					}
-				}
-			}
-		}
-
-		// all plans are set now
-		plans.clear();
-
-		// add the cheapest plan
-		if (cheapest != null) {
-			plans.add(cheapest);
-			cheapest.setPruningMarker(); // remember that that plan is in the set
-		}
-
-		// add all others, which are optimal for some interesting properties
-		for (int i = 0; i < gps.length; i++) {
-			if (toKeep[i] != null) {
-				final PlanNode[] localMatches = toKeep[i];
-				for (final PlanNode n : localMatches) {
-					if (n != null && !n.isPruneMarkerSet()) {
-						n.setPruningMarker();
-						plans.add(n);
-					}
-				}
-			}
-			if (cheapestForGlobal[i] != null) {
-				final PlanNode n = cheapestForGlobal[i];
-				if (!n.isPruneMarkerSet()) {
-					n.setPruningMarker();
-					plans.add(n);
-				}
-			}
-		}
-	}
-	
-	
-	// --------------------------------------------------------------------------------------------
-	//                       Handling of branches
-	// --------------------------------------------------------------------------------------------
-
-	public boolean hasUnclosedBranches() {
-		return this.openBranches != null && !this.openBranches.isEmpty();
-	}
-
-	public Set<OptimizerNode> getClosedBranchingNodes() {
-		return this.closedBranchingNodes;
-	}
-	
-	public List<UnclosedBranchDescriptor> getOpenBranches() {
-		return this.openBranches;
-	}
-
-
-	protected List<UnclosedBranchDescriptor> getBranchesForParent(DagConnection toParent) {
-		if (this.outgoingConnections.size() == 1) {
-			// return our own stack of open branches, because nothing is added
-			if (this.openBranches == null || this.openBranches.isEmpty()) {
-				return Collections.emptyList();
-			} else {
-				return new ArrayList<UnclosedBranchDescriptor>(this.openBranches);
-			}
-		}
-		else if (this.outgoingConnections.size() > 1) {
-			// we branch add a branch info to the stack
-			List<UnclosedBranchDescriptor> branches = new ArrayList<UnclosedBranchDescriptor>(4);
-			if (this.openBranches != null) {
-				branches.addAll(this.openBranches);
-			}
-
-			// find out, which output number the connection to the parent
-			int num;
-			for (num = 0; num < this.outgoingConnections.size(); num++) {
-				if (this.outgoingConnections.get(num) == toParent) {
-					break;
-				}
-			}
-			if (num >= this.outgoingConnections.size()) {
-				throw new CompilerException("Error in compiler: "
-					+ "Parent to get branch info for is not contained in the outgoing connections.");
-			}
-
-			// create the description and add it
-			long bitvector = 0x1L << num;
-			branches.add(new UnclosedBranchDescriptor(this, bitvector));
-			return branches;
-		}
-		else {
-			throw new CompilerException(
-				"Error in compiler: Cannot get branch info for successor in a node with no successors.");
-		}
-	}
-
-	
-	protected void removeClosedBranches(List<UnclosedBranchDescriptor> openList) {
-		if (openList == null || openList.isEmpty() || this.closedBranchingNodes == null || this.closedBranchingNodes.isEmpty()) {
-			return;
-		}
-		
-		Iterator<UnclosedBranchDescriptor> it = openList.iterator();
-		while (it.hasNext()) {
-			if (this.closedBranchingNodes.contains(it.next().getBranchingNode())) {
-				//this branch was already closed --> remove it from the list
-				it.remove();
-			}
-		}
-	}
-	
-	protected void addClosedBranches(Set<OptimizerNode> alreadyClosed) {
-		if (alreadyClosed == null || alreadyClosed.isEmpty()) {
-			return;
-		}
-		
-		if (this.closedBranchingNodes == null) { 
-			this.closedBranchingNodes = new HashSet<OptimizerNode>(alreadyClosed);
-		} else {
-			this.closedBranchingNodes.addAll(alreadyClosed);
-		}
-	}
-	
-	protected void addClosedBranch(OptimizerNode alreadyClosed) {
-		if (this.closedBranchingNodes == null) { 
-			this.closedBranchingNodes = new HashSet<OptimizerNode>();
-		}
-
-		this.closedBranchingNodes.add(alreadyClosed);
-	}
-	
-	/**
-	 * Checks whether to candidate plans for the sub-plan of this node are comparable. The two
-	 * alternative plans are comparable, if
-	 * 
-	 * a) There is no branch in the sub-plan of this node
-	 * b) Both candidates have the same candidate as the child at the last open branch. 
-	 * 
-	 * @param plan1 The root node of the first candidate plan.
-	 * @param plan2 The root node of the second candidate plan.
-	 * @return True if the nodes are branch compatible in the inputs.
-	 */
-	protected boolean areBranchCompatible(PlanNode plan1, PlanNode plan2) {
-		if (plan1 == null || plan2 == null) {
-			throw new NullPointerException();
-		}
-		
-		// if there is no open branch, the children are always compatible.
-		// in most plans, that will be the dominant case
-		if (this.hereJoinedBranches == null || this.hereJoinedBranches.isEmpty()) {
-			return true;
-		}
-
-		for (OptimizerNode joinedBrancher : hereJoinedBranches) {
-			final PlanNode branch1Cand = plan1.getCandidateAtBranchPoint(joinedBrancher);
-			final PlanNode branch2Cand = plan2.getCandidateAtBranchPoint(joinedBrancher);
-			
-			if (branch1Cand != null && branch2Cand != null && branch1Cand != branch2Cand) {
-				return false;
-			}
-		}
-		return true;
-	}
-	
-	/**
-	 * The node IDs are assigned in graph-traversal order (pre-order), hence, each list is
-	 * sorted by ID in ascending order and all consecutive lists start with IDs in ascending order.
-	 *
-	 * @param markJoinedBranchesAsPipelineBreaking True, if the
-	 */
-	protected final boolean mergeLists(List<UnclosedBranchDescriptor> child1open,
-										List<UnclosedBranchDescriptor> child2open,
-										List<UnclosedBranchDescriptor> result,
-										boolean markJoinedBranchesAsPipelineBreaking) {
-
-		//remove branches which have already been closed
-		removeClosedBranches(child1open);
-		removeClosedBranches(child2open);
-		
-		result.clear();
-		
-		// check how many open branches we have. the cases:
-		// 1) if both are null or empty, the result is null
-		// 2) if one side is null (or empty), the result is the other side.
-		// 3) both are set, then we need to merge.
-		if (child1open == null || child1open.isEmpty()) {
-			if(child2open != null && !child2open.isEmpty()) {
-				result.addAll(child2open);
-			}
-			return false;
-		}
-		
-		if (child2open == null || child2open.isEmpty()) {
-			result.addAll(child1open);
-			return false;
-		}
-
-		int index1 = child1open.size() - 1;
-		int index2 = child2open.size() - 1;
-		
-		boolean didCloseABranch = false;
-
-		// as both lists (child1open and child2open) are sorted in ascending ID order
-		// we can do a merge-join-like loop which preserved the order in the result list
-		// and eliminates duplicates
-		while (index1 >= 0 || index2 >= 0) {
-			int id1 = -1;
-			int id2 = index2 >= 0 ? child2open.get(index2).getBranchingNode().getId() : -1;
-
-			while (index1 >= 0 && (id1 = child1open.get(index1).getBranchingNode().getId()) > id2) {
-				result.add(child1open.get(index1));
-				index1--;
-			}
-			while (index2 >= 0 && (id2 = child2open.get(index2).getBranchingNode().getId()) > id1) {
-				result.add(child2open.get(index2));
-				index2--;
-			}
-
-			// match: they share a common branching child
-			if (id1 == id2) {
-				didCloseABranch = true;
-				
-				// if this is the latest common child, remember it
-				OptimizerNode currBanchingNode = child1open.get(index1).getBranchingNode();
-				
-				long vector1 = child1open.get(index1).getJoinedPathsVector();
-				long vector2 = child2open.get(index2).getJoinedPathsVector();
-				
-				// check if this is the same descriptor, (meaning that it contains the same paths)
-				// if it is the same, add it only once, otherwise process the join of the paths
-				if (vector1 == vector2) {
-					result.add(child1open.get(index1));
-				}
-				else {
-					// we merge (re-join) a branch
-
-					// mark the branch as a point where we break the pipeline
-					if (markJoinedBranchesAsPipelineBreaking) {
-						currBanchingNode.markAllOutgoingConnectionsAsPipelineBreaking();
-					}
-
-					if (this.hereJoinedBranches == null) {
-						this.hereJoinedBranches = new ArrayList<OptimizerNode>(2);
-					}
-					this.hereJoinedBranches.add(currBanchingNode);
-
-					// see, if this node closes the branch
-					long joinedInputs = vector1 | vector2;
-
-					// this is 2^size - 1, which is all bits set at positions 0..size-1
-					long allInputs = (0x1L << currBanchingNode.getOutgoingConnections().size()) - 1;
-
-					if (joinedInputs == allInputs) {
-						// closed - we can remove it from the stack
-						addClosedBranch(currBanchingNode);
-					} else {
-						// not quite closed
-						result.add(new UnclosedBranchDescriptor(currBanchingNode, joinedInputs));
-					}
-				}
-
-				index1--;
-				index2--;
-			}
-		}
-
-		// merged. now we need to reverse the list, because we added the elements in reverse order
-		Collections.reverse(result);
-		return didCloseABranch;
-	}
-
-	@Override
-	public OptimizerNode getOptimizerNode() {
-		return this;
-	}
-	
-	@Override
-	public PlanNode getPlanNode() {
-		return null;
-	}
-	
-	@Override
-	public Iterable<DumpableConnection<OptimizerNode>> getDumpableInputs() {
-		List<DumpableConnection<OptimizerNode>> allInputs = new ArrayList<DumpableConnection<OptimizerNode>>();
-		
-		allInputs.addAll(getIncomingConnections());
-		allInputs.addAll(getBroadcastConnections());
-		
-		return allInputs;
-	}
-	
-	@Override
-	public String toString() {
-		StringBuilder bld = new StringBuilder();
-
-		bld.append(getName());
-		bld.append(" (").append(getOperator().getName()).append(") ");
-
-		int i = 1; 
-		for (DagConnection conn : getIncomingConnections()) {
-			String shipStrategyName = conn.getShipStrategy() == null ? "null" : conn.getShipStrategy().name();
-			bld.append('(').append(i++).append(":").append(shipStrategyName).append(')');
-		}
-
-		return bld.toString();
-	}
-
-	// --------------------------------------------------------------------------------------------
-
-	/**
-	 * Description of an unclosed branch. An unclosed branch is when the data flow branched (one operator's
-	 * result is consumed by multiple targets), but these different branches (targets) have not been joined
-	 * together.
-	 */
-	public static final class UnclosedBranchDescriptor {
-
-		protected OptimizerNode branchingNode;
-
-		protected long joinedPathsVector;
-
-		/**
-		 * Creates a new branching descriptor.
-		 *
-		 * @param branchingNode The node where the branch occurred (teh node with multiple outputs).
-		 * @param joinedPathsVector A bit vector describing which branches are tracked by this descriptor.
-		 *                          The bit vector is one, where the branch is tracked, zero otherwise.
-		 */
-		protected UnclosedBranchDescriptor(OptimizerNode branchingNode, long joinedPathsVector) {
-			this.branchingNode = branchingNode;
-			this.joinedPathsVector = joinedPathsVector;
-		}
-
-		public OptimizerNode getBranchingNode() {
-			return this.branchingNode;
-		}
-
-		public long getJoinedPathsVector() {
-			return this.joinedPathsVector;
-		}
-
-		@Override
-		public String toString() {
-			return "(" + this.branchingNode.getOperator() + ") [" + this.joinedPathsVector + "]";
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/PartitionNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/PartitionNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/PartitionNode.java
deleted file mode 100644
index 5c811b0..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/PartitionNode.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.dag;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.common.operators.SemanticProperties;
-import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
-import org.apache.flink.api.common.operators.base.PartitionOperatorBase;
-import org.apache.flink.api.common.operators.base.PartitionOperatorBase.PartitionMethod;
-import org.apache.flink.api.common.operators.util.FieldSet;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
-import org.apache.flink.optimizer.operators.OperatorDescriptorSingle;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.runtime.operators.DriverStrategy;
-
-/**
- * The optimizer's internal representation of a <i>Partition</i> operator node.
- */
-public class PartitionNode extends SingleInputNode {
-
-	private final List<OperatorDescriptorSingle> possibleProperties;
-	
-	public PartitionNode(PartitionOperatorBase<?> operator) {
-		super(operator);
-		
-		OperatorDescriptorSingle descr = new PartitionDescriptor(
-					this.getOperator().getPartitionMethod(), this.keys, operator.getCustomPartitioner());
-		this.possibleProperties = Collections.singletonList(descr);
-	}
-
-	@Override
-	public PartitionOperatorBase<?> getOperator() {
-		return (PartitionOperatorBase<?>) super.getOperator();
-	}
-
-	@Override
-	public String getName() {
-		return "Partition";
-	}
-
-	@Override
-	protected List<OperatorDescriptorSingle> getPossibleProperties() {
-		return this.possibleProperties;
-	}
-
-	@Override
-	protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
-		// partitioning does not change the number of records
-		this.estimatedNumRecords = getPredecessorNode().getEstimatedNumRecords();
-		this.estimatedOutputSize = getPredecessorNode().getEstimatedOutputSize();
-	}
-	
-	@Override
-	public SemanticProperties getSemanticProperties() {
-		return new SingleInputSemanticProperties.AllFieldsForwardedProperties();
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	public static class PartitionDescriptor extends OperatorDescriptorSingle {
-
-		private final PartitionMethod pMethod;
-		private final Partitioner<?> customPartitioner;
-		
-		public PartitionDescriptor(PartitionMethod pMethod, FieldSet pKeys, Partitioner<?> customPartitioner) {
-			super(pKeys);
-			
-			this.pMethod = pMethod;
-			this.customPartitioner = customPartitioner;
-		}
-		
-		@Override
-		public DriverStrategy getStrategy() {
-			return DriverStrategy.UNARY_NO_OP;
-		}
-
-		@Override
-		public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
-			return new SingleInputPlanNode(node, "Partition", in, DriverStrategy.UNARY_NO_OP);
-		}
-
-		@Override
-		protected List<RequestedGlobalProperties> createPossibleGlobalProperties() {
-			RequestedGlobalProperties rgps = new RequestedGlobalProperties();
-			
-			switch (this.pMethod) {
-			case HASH:
-				rgps.setHashPartitioned(this.keys);
-				break;
-			case REBALANCE:
-				rgps.setForceRebalancing();
-				break;
-			case CUSTOM:
-				rgps.setCustomPartitioned(this.keys, this.customPartitioner);
-				break;
-			case RANGE:
-				throw new UnsupportedOperationException("Not yet supported");
-			default:
-				throw new IllegalArgumentException("Invalid partition method");
-			}
-			
-			return Collections.singletonList(rgps);
-		}
-
-		@Override
-		protected List<RequestedLocalProperties> createPossibleLocalProperties() {
-			// partitioning does not require any local property.
-			return Collections.singletonList(new RequestedLocalProperties());
-		}
-		
-		@Override
-		public GlobalProperties computeGlobalProperties(GlobalProperties gProps) {
-			// the partition node is a no-operation operation, such that all global properties are preserved.
-			return gProps;
-		}
-		
-		@Override
-		public LocalProperties computeLocalProperties(LocalProperties lProps) {
-			// the partition node is a no-operation operation, such that all global properties are preserved.
-			return lProps;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/PlanCacheCleaner.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/PlanCacheCleaner.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/PlanCacheCleaner.java
deleted file mode 100644
index bbe4607..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/PlanCacheCleaner.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dag;
-
-import org.apache.flink.util.Visitor;
-
-final class PlanCacheCleaner implements Visitor<OptimizerNode> {
-	
-	static final PlanCacheCleaner INSTANCE = new PlanCacheCleaner();
-
-	@Override
-	public boolean preVisit(OptimizerNode visitable) {
-		if (visitable.cachedPlans != null && visitable.isOnDynamicPath()) {
-			visitable.cachedPlans = null;
-			return true;
-		} else {
-			return false;
-		}
-	}
-
-	@Override
-	public void postVisit(OptimizerNode visitable) {}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java
deleted file mode 100644
index 1477038..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.dag;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.operators.AllReduceProperties;
-import org.apache.flink.optimizer.operators.OperatorDescriptorSingle;
-import org.apache.flink.optimizer.operators.ReduceProperties;
-
-/**
- * The Optimizer representation of a <i>Reduce</i> operator.
- */
-public class ReduceNode extends SingleInputNode {
-	
-	private final List<OperatorDescriptorSingle> possibleProperties;
-	
-	private ReduceNode preReduceUtilityNode;
-	
-
-	public ReduceNode(ReduceOperatorBase<?, ?> operator) {
-		super(operator);
-		
-		if (this.keys == null) {
-			// case of a key-less reducer. force a parallelism of 1
-			setDegreeOfParallelism(1);
-		}
-		
-		OperatorDescriptorSingle props = this.keys == null ?
-			new AllReduceProperties() :
-			new ReduceProperties(this.keys, operator.getCustomPartitioner());
-		
-		this.possibleProperties = Collections.singletonList(props);
-	}
-	
-	public ReduceNode(ReduceNode reducerToCopyForCombiner) {
-		super(reducerToCopyForCombiner);
-		
-		this.possibleProperties = Collections.emptyList();
-	}
-
-	// ------------------------------------------------------------------------
-
-	@Override
-	public ReduceOperatorBase<?, ?> getOperator() {
-		return (ReduceOperatorBase<?, ?>) super.getOperator();
-	}
-
-	@Override
-	public String getName() {
-		return "Reduce";
-	}
-	
-	@Override
-	protected List<OperatorDescriptorSingle> getPossibleProperties() {
-		return this.possibleProperties;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//  Estimates
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
-		// no real estimates possible for a reducer.
-	}
-	
-	public ReduceNode getCombinerUtilityNode() {
-		if (this.preReduceUtilityNode == null) {
-			this.preReduceUtilityNode = new ReduceNode(this);
-			
-			// we conservatively assume the combiner returns the same data size as it consumes 
-			this.preReduceUtilityNode.estimatedOutputSize = getPredecessorNode().getEstimatedOutputSize();
-			this.preReduceUtilityNode.estimatedNumRecords = getPredecessorNode().getEstimatedNumRecords();
-		}
-		return this.preReduceUtilityNode;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SingleInputNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SingleInputNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SingleInputNode.java
deleted file mode 100644
index cc12bb8..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SingleInputNode.java
+++ /dev/null
@@ -1,518 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.dag;
-
-import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE;
-import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE_AND_DAM;
-import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.NOT_FOUND;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.flink.api.common.ExecutionMode;
-import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.common.operators.SemanticProperties;
-import org.apache.flink.api.common.operators.SingleInputOperator;
-import org.apache.flink.api.common.operators.util.FieldSet;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.Optimizer;
-import org.apache.flink.optimizer.costs.CostEstimator;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.InterestingProperties;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
-import org.apache.flink.optimizer.operators.OperatorDescriptorSingle;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.NamedChannel;
-import org.apache.flink.optimizer.plan.PlanNode;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport;
-import org.apache.flink.optimizer.util.NoOpUnaryUdfOp;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.io.network.DataExchangeMode;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.util.Visitor;
-
-import com.google.common.collect.Sets;
-
-/**
- * A node in the optimizer's program representation for an operation with a single input.
- * 
- * This class contains all the generic logic for handling branching flows, as well as to
- * enumerate candidate execution plans. The subclasses for specific operators simply add logic
- * for cost estimates and specify possible strategies for their execution.
- */
-public abstract class SingleInputNode extends OptimizerNode {
-	
-	protected final FieldSet keys; 			// The set of key fields
-	
-	protected DagConnection inConn; 		// the input of the node
-	
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * Creates a new node with a single input for the optimizer plan.
-	 * 
-	 * @param programOperator The PACT that the node represents.
-	 */
-	protected SingleInputNode(SingleInputOperator<?, ?, ?> programOperator) {
-		super(programOperator);
-		
-		int[] k = programOperator.getKeyColumns(0);
-		this.keys = k == null || k.length == 0 ? null : new FieldSet(k);
-	}
-	
-	protected SingleInputNode(FieldSet keys) {
-		super(NoOpUnaryUdfOp.INSTANCE);
-		this.keys = keys;
-	}
-	
-	protected SingleInputNode() {
-		super(NoOpUnaryUdfOp.INSTANCE);
-		this.keys = null;
-	}
-	
-	protected SingleInputNode(SingleInputNode toCopy) {
-		super(toCopy);
-		
-		this.keys = toCopy.keys;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public SingleInputOperator<?, ?, ?> getOperator() {
-		return (SingleInputOperator<?, ?, ?>) super.getOperator();
-	}
-	
-	/**
-	 * Gets the input of this operator.
-	 * 
-	 * @return The input.
-	 */
-	public DagConnection getIncomingConnection() {
-		return this.inConn;
-	}
-
-	/**
-	 * Sets the connection through which this node receives its input.
-	 * 
-	 * @param inConn The input connection to set.
-	 */
-	public void setIncomingConnection(DagConnection inConn) {
-		this.inConn = inConn;
-	}
-	
-	/**
-	 * Gets the predecessor of this node.
-	 * 
-	 * @return The predecessor of this node. 
-	 */
-	public OptimizerNode getPredecessorNode() {
-		if (this.inConn != null) {
-			return this.inConn.getSource();
-		} else {
-			return null;
-		}
-	}
-
-	@Override
-	public List<DagConnection> getIncomingConnections() {
-		return Collections.singletonList(this.inConn);
-	}
-	
-
-	@Override
-	public SemanticProperties getSemanticProperties() {
-		return getOperator().getSemanticProperties();
-	}
-	
-
-	@Override
-	public void setInput(Map<Operator<?>, OptimizerNode> contractToNode, ExecutionMode defaultExchangeMode)
-			throws CompilerException
-	{
-		// see if an internal hint dictates the strategy to use
-		final Configuration conf = getOperator().getParameters();
-		final String shipStrategy = conf.getString(Optimizer.HINT_SHIP_STRATEGY, null);
-		final ShipStrategyType preSet;
-		
-		if (shipStrategy != null) {
-			if (shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH)) {
-				preSet = ShipStrategyType.PARTITION_HASH;
-			} else if (shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_REPARTITION_RANGE)) {
-				preSet = ShipStrategyType.PARTITION_RANGE;
-			} else if (shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_FORWARD)) {
-				preSet = ShipStrategyType.FORWARD;
-			} else if (shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_REPARTITION)) {
-				preSet = ShipStrategyType.PARTITION_RANDOM;
-			} else {
-				throw new CompilerException("Unrecognized ship strategy hint: " + shipStrategy);
-			}
-		} else {
-			preSet = null;
-		}
-		
-		// get the predecessor node
-		Operator<?> children = ((SingleInputOperator<?, ?, ?>) getOperator()).getInput();
-		
-		OptimizerNode pred;
-		DagConnection conn;
-		if (children == null) {
-			throw new CompilerException("Error: Node for '" + getOperator().getName() + "' has no input.");
-		} else {
-			pred = contractToNode.get(children);
-			conn = new DagConnection(pred, this, defaultExchangeMode);
-			if (preSet != null) {
-				conn.setShipStrategy(preSet);
-			}
-		}
-		
-		// create the connection and add it
-		setIncomingConnection(conn);
-		pred.addOutgoingConnection(conn);
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//                             Properties and Optimization
-	// --------------------------------------------------------------------------------------------
-	
-	protected abstract List<OperatorDescriptorSingle> getPossibleProperties();
-	
-	@Override
-	public void computeInterestingPropertiesForInputs(CostEstimator estimator) {
-		// get what we inherit and what is preserved by our user code 
-		final InterestingProperties props = getInterestingProperties().filterByCodeAnnotations(this, 0);
-		
-		// add all properties relevant to this node
-		for (OperatorDescriptorSingle dps : getPossibleProperties()) {
-			for (RequestedGlobalProperties gp : dps.getPossibleGlobalProperties()) {
-				
-				if (gp.getPartitioning().isPartitionedOnKey()) {
-					// make sure that among the same partitioning types, we do not push anything down that has fewer key fields
-					
-					for (RequestedGlobalProperties contained : props.getGlobalProperties()) {
-						if (contained.getPartitioning() == gp.getPartitioning() && gp.getPartitionedFields().isValidSubset(contained.getPartitionedFields())) {
-							props.getGlobalProperties().remove(contained);
-							break;
-						}
-					}
-				}
-				
-				props.addGlobalProperties(gp);
-			}
-			
-			for (RequestedLocalProperties lp : dps.getPossibleLocalProperties()) {
-				props.addLocalProperties(lp);
-			}
-		}
-		this.inConn.setInterestingProperties(props);
-		
-		for (DagConnection conn : getBroadcastConnections()) {
-			conn.setInterestingProperties(new InterestingProperties());
-		}
-	}
-	
-
-	@Override
-	public List<PlanNode> getAlternativePlans(CostEstimator estimator) {
-		// check if we have a cached version
-		if (this.cachedPlans != null) {
-			return this.cachedPlans;
-		}
-
-		boolean childrenSkippedDueToReplicatedInput = false;
-
-		// calculate alternative sub-plans for predecessor
-		final List<? extends PlanNode> subPlans = getPredecessorNode().getAlternativePlans(estimator);
-		final Set<RequestedGlobalProperties> intGlobal = this.inConn.getInterestingProperties().getGlobalProperties();
-		
-		// calculate alternative sub-plans for broadcast inputs
-		final List<Set<? extends NamedChannel>> broadcastPlanChannels = new ArrayList<Set<? extends NamedChannel>>();
-		List<DagConnection> broadcastConnections = getBroadcastConnections();
-		List<String> broadcastConnectionNames = getBroadcastConnectionNames();
-
-		for (int i = 0; i < broadcastConnections.size(); i++ ) {
-			DagConnection broadcastConnection = broadcastConnections.get(i);
-			String broadcastConnectionName = broadcastConnectionNames.get(i);
-			List<PlanNode> broadcastPlanCandidates = broadcastConnection.getSource().getAlternativePlans(estimator);
-
-			// wrap the plan candidates in named channels
-			HashSet<NamedChannel> broadcastChannels = new HashSet<NamedChannel>(broadcastPlanCandidates.size());
-			for (PlanNode plan: broadcastPlanCandidates) {
-				NamedChannel c = new NamedChannel(broadcastConnectionName, plan);
-				DataExchangeMode exMode = DataExchangeMode.select(broadcastConnection.getDataExchangeMode(),
-										ShipStrategyType.BROADCAST, broadcastConnection.isBreakingPipeline());
-				c.setShipStrategy(ShipStrategyType.BROADCAST, exMode);
-				broadcastChannels.add(c);
-			}
-			broadcastPlanChannels.add(broadcastChannels);
-		}
-
-		final RequestedGlobalProperties[] allValidGlobals;
-		{
-			Set<RequestedGlobalProperties> pairs = new HashSet<RequestedGlobalProperties>();
-			for (OperatorDescriptorSingle ods : getPossibleProperties()) {
-				pairs.addAll(ods.getPossibleGlobalProperties());
-			}
-			allValidGlobals = pairs.toArray(new RequestedGlobalProperties[pairs.size()]);
-		}
-		final ArrayList<PlanNode> outputPlans = new ArrayList<PlanNode>();
-
-		final ExecutionMode executionMode = this.inConn.getDataExchangeMode();
-
-		final int dop = getParallelism();
-		final int inDop = getPredecessorNode().getParallelism();
-		final boolean dopChange = inDop != dop;
-
-		final boolean breaksPipeline = this.inConn.isBreakingPipeline();
-
-		// create all candidates
-		for (PlanNode child : subPlans) {
-
-			if (child.getGlobalProperties().isFullyReplicated()) {
-				// fully replicated input is always locally forwarded if DOP is not changed
-				if (dopChange) {
-					// can not continue with this child
-					childrenSkippedDueToReplicatedInput = true;
-					continue;
-				} else {
-					this.inConn.setShipStrategy(ShipStrategyType.FORWARD);
-				}
-			}
-
-			if (this.inConn.getShipStrategy() == null) {
-				// pick the strategy ourselves
-				for (RequestedGlobalProperties igps: intGlobal) {
-					final Channel c = new Channel(child, this.inConn.getMaterializationMode());
-					igps.parameterizeChannel(c, dopChange, executionMode, breaksPipeline);
-					
-					// if the DOP changed, make sure that we cancel out properties, unless the
-					// ship strategy preserves/establishes them even under changing DOPs
-					if (dopChange && !c.getShipStrategy().isNetworkStrategy()) {
-						c.getGlobalProperties().reset();
-					}
-					
-					// check whether we meet any of the accepted properties
-					// we may remove this check, when we do a check to not inherit
-					// requested global properties that are incompatible with all possible
-					// requested properties
-					for (RequestedGlobalProperties rgps: allValidGlobals) {
-						if (rgps.isMetBy(c.getGlobalProperties())) {
-							c.setRequiredGlobalProps(rgps);
-							addLocalCandidates(c, broadcastPlanChannels, igps, outputPlans, estimator);
-							break;
-						}
-					}
-				}
-			} else {
-				// hint fixed the strategy
-				final Channel c = new Channel(child, this.inConn.getMaterializationMode());
-				final ShipStrategyType shipStrategy = this.inConn.getShipStrategy();
-				final DataExchangeMode exMode = DataExchangeMode.select(executionMode, shipStrategy, breaksPipeline);
-
-				if (this.keys != null) {
-					c.setShipStrategy(shipStrategy, this.keys.toFieldList(), exMode);
-				} else {
-					c.setShipStrategy(shipStrategy, exMode);
-				}
-				
-				if (dopChange) {
-					c.adjustGlobalPropertiesForFullParallelismChange();
-				}
-
-				// check whether we meet any of the accepted properties
-				for (RequestedGlobalProperties rgps: allValidGlobals) {
-					if (rgps.isMetBy(c.getGlobalProperties())) {
-						addLocalCandidates(c, broadcastPlanChannels, rgps, outputPlans, estimator);
-						break;
-					}
-				}
-			}
-		}
-
-		if(outputPlans.isEmpty()) {
-			if(childrenSkippedDueToReplicatedInput) {
-				throw new CompilerException("No plan meeting the requirements could be created @ " + this + ". Most likely reason: Invalid use of replicated input.");
-			} else {
-				throw new CompilerException("No plan meeting the requirements could be created @ " + this + ". Most likely reason: Too restrictive plan hints.");
-			}
-		}
-
-		// cost and prune the plans
-		for (PlanNode node : outputPlans) {
-			estimator.costOperator(node);
-		}
-		prunePlanAlternatives(outputPlans);
-		outputPlans.trimToSize();
-
-		this.cachedPlans = outputPlans;
-		return outputPlans;
-	}
-	
-	protected void addLocalCandidates(Channel template, List<Set<? extends NamedChannel>> broadcastPlanChannels, RequestedGlobalProperties rgps,
-			List<PlanNode> target, CostEstimator estimator)
-	{
-		for (RequestedLocalProperties ilp : this.inConn.getInterestingProperties().getLocalProperties()) {
-			final Channel in = template.clone();
-			ilp.parameterizeChannel(in);
-			
-			// instantiate a candidate, if the instantiated local properties meet one possible local property set
-			outer:
-			for (OperatorDescriptorSingle dps: getPossibleProperties()) {
-				for (RequestedLocalProperties ilps : dps.getPossibleLocalProperties()) {
-					if (ilps.isMetBy(in.getLocalProperties())) {
-						in.setRequiredLocalProps(ilps);
-						instantiateCandidate(dps, in, broadcastPlanChannels, target, estimator, rgps, ilp);
-						break outer;
-					}
-				}
-			}
-		}
-	}
-
-	protected void instantiateCandidate(OperatorDescriptorSingle dps, Channel in, List<Set<? extends NamedChannel>> broadcastPlanChannels,
-			List<PlanNode> target, CostEstimator estimator, RequestedGlobalProperties globPropsReq, RequestedLocalProperties locPropsReq)
-	{
-		final PlanNode inputSource = in.getSource();
-		
-		for (List<NamedChannel> broadcastChannelsCombination: Sets.cartesianProduct(broadcastPlanChannels)) {
-			
-			boolean validCombination = true;
-			boolean requiresPipelinebreaker = false;
-			
-			// check whether the broadcast inputs use the same plan candidate at the branching point
-			for (int i = 0; i < broadcastChannelsCombination.size(); i++) {
-				NamedChannel nc = broadcastChannelsCombination.get(i);
-				PlanNode bcSource = nc.getSource();
-				
-				// check branch compatibility against input
-				if (!areBranchCompatible(bcSource, inputSource)) {
-					validCombination = false;
-					break;
-				}
-				
-				// check branch compatibility against all other broadcast variables
-				for (int k = 0; k < i; k++) {
-					PlanNode otherBcSource = broadcastChannelsCombination.get(k).getSource();
-					
-					if (!areBranchCompatible(bcSource, otherBcSource)) {
-						validCombination = false;
-						break;
-					}
-				}
-				
-				// check if there is a common predecessor and whether there is a dam on the way to all common predecessors
-				if (this.hereJoinedBranches != null) {
-					for (OptimizerNode brancher : this.hereJoinedBranches) {
-						PlanNode candAtBrancher = in.getSource().getCandidateAtBranchPoint(brancher);
-						
-						if (candAtBrancher == null) {
-							// closed branch between two broadcast variables
-							continue;
-						}
-						
-						SourceAndDamReport res = in.getSource().hasDamOnPathDownTo(candAtBrancher);
-						if (res == NOT_FOUND) {
-							throw new CompilerException("Bug: Tracing dams for deadlock detection is broken.");
-						} else if (res == FOUND_SOURCE) {
-							requiresPipelinebreaker = true;
-							break;
-						} else if (res == FOUND_SOURCE_AND_DAM) {
-							// good
-						} else {
-							throw new CompilerException();
-						}
-					}
-				}
-			}
-			
-			if (!validCombination) {
-				continue;
-			}
-			
-			if (requiresPipelinebreaker) {
-				in.setTempMode(in.getTempMode().makePipelineBreaker());
-			}
-			
-			final SingleInputPlanNode node = dps.instantiate(in, this);
-			node.setBroadcastInputs(broadcastChannelsCombination);
-			
-			// compute how the strategy affects the properties
-			GlobalProperties gProps = in.getGlobalProperties().clone();
-			LocalProperties lProps = in.getLocalProperties().clone();
-			gProps = dps.computeGlobalProperties(gProps);
-			lProps = dps.computeLocalProperties(lProps);
-
-			SemanticProperties props = this.getSemanticProperties();
-			// filter by the user code field copies
-			gProps = gProps.filterBySemanticProperties(props, 0);
-			lProps = lProps.filterBySemanticProperties(props, 0);
-			
-			// apply
-			node.initProperties(gProps, lProps);
-			node.updatePropertiesWithUniqueSets(getUniqueFields());
-			target.add(node);
-		}
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//                                     Branch Handling
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public void computeUnclosedBranchStack() {
-		if (this.openBranches != null) {
-			return;
-		}
-
-		addClosedBranches(getPredecessorNode().closedBranchingNodes);
-		List<UnclosedBranchDescriptor> fromInput = getPredecessorNode().getBranchesForParent(this.inConn);
-		
-		// handle the data flow branching for the broadcast inputs
-		List<UnclosedBranchDescriptor> result = computeUnclosedBranchStackForBroadcastInputs(fromInput);
-		
-		this.openBranches = (result == null || result.isEmpty()) ? Collections.<UnclosedBranchDescriptor>emptyList() : result;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//                                     Miscellaneous
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public void accept(Visitor<OptimizerNode> visitor) {
-		if (visitor.preVisit(this)) {
-			if (getPredecessorNode() != null) {
-				getPredecessorNode().accept(visitor);
-			} else {
-				throw new CompilerException();
-			}
-			for (DagConnection connection : getBroadcastConnections()) {
-				connection.getSource().accept(visitor);
-			}
-			visitor.postVisit(this);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SinkJoiner.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SinkJoiner.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SinkJoiner.java
deleted file mode 100644
index 40725ba..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SinkJoiner.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dag;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.common.ExecutionMode;
-import org.apache.flink.api.common.typeinfo.NothingTypeInfo;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.operators.OperatorDescriptorDual;
-import org.apache.flink.optimizer.operators.UtilSinkJoinOpDescriptor;
-import org.apache.flink.optimizer.util.NoOpBinaryUdfOp;
-import org.apache.flink.types.Nothing;
-
-/**
- * This class represents a utility node that is not part of the actual plan.
- * It is used for plans with multiple data sinks to transform it into a plan with
- * a single root node. That way, the code that makes sure no costs are double-counted and that
- * candidate selection works correctly with nodes that have multiple outputs is transparently reused.
- */
-public class SinkJoiner extends TwoInputNode {
-	
-	public SinkJoiner(OptimizerNode input1, OptimizerNode input2) {
-		super(new NoOpBinaryUdfOp<Nothing>(new NothingTypeInfo()));
-
-		DagConnection conn1 = new DagConnection(input1, this, null, ExecutionMode.PIPELINED);
-		DagConnection conn2 = new DagConnection(input2, this, null, ExecutionMode.PIPELINED);
-		
-		this.input1 = conn1;
-		this.input2 = conn2;
-		
-		setDegreeOfParallelism(1);
-	}
-	
-	@Override
-	public String getName() {
-		return "Internal Utility Node";
-	}
-	
-	@Override
-	public List<DagConnection> getOutgoingConnections() {
-		return Collections.emptyList();
-	}
-	
-	@Override
-	public void computeUnclosedBranchStack() {
-		if (this.openBranches != null) {
-			return;
-		}
-		
-		addClosedBranches(getFirstPredecessorNode().closedBranchingNodes);
-		addClosedBranches(getSecondPredecessorNode().closedBranchingNodes);
-		
-		List<UnclosedBranchDescriptor> pred1branches = getFirstPredecessorNode().openBranches;
-		List<UnclosedBranchDescriptor> pred2branches = getSecondPredecessorNode().openBranches;
-		
-		// if the predecessors do not have branches, then we have multiple sinks that do not originate from
-		// a common data flow.
-		if (pred1branches == null || pred1branches.isEmpty()) {
-			
-			this.openBranches = (pred2branches == null || pred2branches.isEmpty()) ?
-					Collections.<UnclosedBranchDescriptor>emptyList() : // both empty - disconnected flow
-					pred2branches;
-		}
-		else if (pred2branches == null || pred2branches.isEmpty()) {
-			this.openBranches = pred1branches;
-		}
-		else {
-			// copy the lists and merge
-			List<UnclosedBranchDescriptor> result1 = new ArrayList<UnclosedBranchDescriptor>(pred1branches);
-			List<UnclosedBranchDescriptor> result2 = new ArrayList<UnclosedBranchDescriptor>(pred2branches);
-			
-			ArrayList<UnclosedBranchDescriptor> result = new ArrayList<UnclosedBranchDescriptor>();
-			mergeLists(result1, result2, result, false);
-			
-			this.openBranches = result.isEmpty() ? Collections.<UnclosedBranchDescriptor>emptyList() : result;
-		}
-	}
-
-	@Override
-	protected List<OperatorDescriptorDual> getPossibleProperties() {
-		return Collections.<OperatorDescriptorDual>singletonList(new UtilSinkJoinOpDescriptor());
-	}
-
-	@Override
-	public void computeOutputEstimates(DataStatistics statistics) {
-		// nothing to be done here
-	}
-
-	@Override
-	protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
-		// no estimates needed at this point
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SolutionSetNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SolutionSetNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SolutionSetNode.java
deleted file mode 100644
index 1292cf5..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SolutionSetNode.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.dag;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.common.operators.base.DeltaIterationBase.SolutionSetPlaceHolder;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.PlanNode;
-import org.apache.flink.optimizer.plan.SolutionSetPlanNode;
-
-/**
- * The optimizer's internal representation of the solution set of a workset iteration.
- */
-public class SolutionSetNode extends AbstractPartialSolutionNode {
-	
-	private final WorksetIterationNode iterationNode;
-	
-	
-	public SolutionSetNode(SolutionSetPlaceHolder<?> psph, WorksetIterationNode iterationNode) {
-		super(psph);
-		this.iterationNode = iterationNode;
-	}
-
-	// --------------------------------------------------------------------------------------------
-	
-	public void setCandidateProperties(GlobalProperties gProps, LocalProperties lProps, Channel initialInput) {
-		this.cachedPlans = Collections.<PlanNode>singletonList(new SolutionSetPlanNode(this, "SolutionSet("+this.getOperator().getName()+")", gProps, lProps, initialInput));
-	}
-	
-	public SolutionSetPlanNode getCurrentSolutionSetPlanNode() {
-		if (this.cachedPlans != null) {
-			return (SolutionSetPlanNode) this.cachedPlans.get(0);
-		} else {
-			throw new IllegalStateException();
-		}
-	}
-	
-	public WorksetIterationNode getIterationNode() {
-		return this.iterationNode;
-	}
-	
-	@Override
-	public void computeOutputEstimates(DataStatistics statistics) {
-		copyEstimates(this.iterationNode.getInitialSolutionSetPredecessorNode());
-	}
-	
-	// --------------------------------------------------------------------------------------------
-
-	/**
-	 * Gets the contract object for this data source node.
-	 * 
-	 * @return The contract.
-	 */
-	@Override
-	public SolutionSetPlaceHolder<?> getOperator() {
-		return (SolutionSetPlaceHolder<?>) super.getOperator();
-	}
-
-	@Override
-	public String getName() {
-		return "Solution Set";
-	}
-	
-	@Override
-	public void computeUnclosedBranchStack() {
-		if (this.openBranches != null) {
-			return;
-		}
-
-		DagConnection solutionSetInput = this.iterationNode.getFirstIncomingConnection();
-		OptimizerNode solutionSetSource = solutionSetInput.getSource();
-		
-		addClosedBranches(solutionSetSource.closedBranchingNodes);
-		List<UnclosedBranchDescriptor> fromInput = solutionSetSource.getBranchesForParent(solutionSetInput);
-		this.openBranches = (fromInput == null || fromInput.isEmpty()) ? Collections.<UnclosedBranchDescriptor>emptyList() : fromInput;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SortPartitionNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SortPartitionNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SortPartitionNode.java
deleted file mode 100644
index 83bc39a..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SortPartitionNode.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.dag;
-
-import org.apache.flink.api.common.operators.Ordering;
-import org.apache.flink.api.common.operators.SemanticProperties;
-import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
-import org.apache.flink.api.common.operators.base.SortPartitionOperatorBase;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
-import org.apache.flink.optimizer.operators.OperatorDescriptorSingle;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.runtime.operators.DriverStrategy;
-
-import java.util.Collections;
-import java.util.List;
-
-/**
- * The optimizer's internal representation of a <i>SortPartition</i> operator node.
- */
-public class SortPartitionNode extends SingleInputNode {
-
-	private final List<OperatorDescriptorSingle> possibleProperties;
-
-	public SortPartitionNode(SortPartitionOperatorBase<?> operator) {
-		super(operator);
-		
-		OperatorDescriptorSingle descr = new SortPartitionDescriptor(operator.getPartitionOrdering());
-		this.possibleProperties = Collections.singletonList(descr);
-	}
-
-	@Override
-	public SortPartitionOperatorBase<?> getOperator() {
-		return (SortPartitionOperatorBase<?>) super.getOperator();
-	}
-
-	@Override
-	public String getName() {
-		return "Sort-Partition";
-	}
-
-	@Override
-	protected List<OperatorDescriptorSingle> getPossibleProperties() {
-		return this.possibleProperties;
-	}
-
-	@Override
-	protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
-		// sorting does not change the number of records
-		this.estimatedNumRecords = getPredecessorNode().getEstimatedNumRecords();
-		this.estimatedOutputSize = getPredecessorNode().getEstimatedOutputSize();
-	}
-	
-	@Override
-	public SemanticProperties getSemanticProperties() {
-		return new SingleInputSemanticProperties.AllFieldsForwardedProperties();
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	public static class SortPartitionDescriptor extends OperatorDescriptorSingle {
-
-		private Ordering partitionOrder;
-
-		public SortPartitionDescriptor(Ordering partitionOrder) {
-			this.partitionOrder = partitionOrder;
-		}
-		
-		@Override
-		public DriverStrategy getStrategy() {
-			return DriverStrategy.UNARY_NO_OP;
-		}
-
-		@Override
-		public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
-			return new SingleInputPlanNode(node, "Sort-Partition", in, DriverStrategy.UNARY_NO_OP);
-		}
-
-		@Override
-		protected List<RequestedGlobalProperties> createPossibleGlobalProperties() {
-			// sort partition does not require any global property
-			return Collections.singletonList(new RequestedGlobalProperties());
-		}
-
-		@Override
-		protected List<RequestedLocalProperties> createPossibleLocalProperties() {
-			// set partition order as required local property
-			RequestedLocalProperties rlp = new RequestedLocalProperties();
-			rlp.setOrdering(this.partitionOrder);
-
-			return Collections.singletonList(rlp);
-		}
-		
-		@Override
-		public GlobalProperties computeGlobalProperties(GlobalProperties gProps) {
-			// sort partition is a no-operation operation, such that all global properties are preserved.
-			return gProps;
-		}
-		
-		@Override
-		public LocalProperties computeLocalProperties(LocalProperties lProps) {
-			// sort partition is a no-operation operation, such that all global properties are preserved.
-			return lProps;
-		}
-	}
-}