You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/20 11:06:52 UTC

[13/53] [abbrv] flink git commit: [optimizer] Rename optimizer project to "flink-optimizer" (previously flink-compiler)

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/BulkPartialSolutionPlanNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/BulkPartialSolutionPlanNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/BulkPartialSolutionPlanNode.java
new file mode 100644
index 0000000..df05b64
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/BulkPartialSolutionPlanNode.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.plan;
+
+import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE;
+import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE_AND_DAM;
+import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.NOT_FOUND;
+
+import java.util.Collections;
+import java.util.HashMap;
+
+import org.apache.flink.optimizer.costs.Costs;
+import org.apache.flink.optimizer.dag.BulkPartialSolutionNode;
+import org.apache.flink.optimizer.dag.OptimizerNode;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.runtime.operators.DamBehavior;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.util.Visitor;
+
+/**
+ * Plan candidate node for partial solution of a bulk iteration.
+ */
+public class BulkPartialSolutionPlanNode extends PlanNode {
+	
+	private static final Costs NO_COSTS = new Costs();
+	
+	private BulkIterationPlanNode containingIterationNode;
+	
+	private Channel initialInput;
+	
+	public Object postPassHelper;
+	
+	
+	public BulkPartialSolutionPlanNode(BulkPartialSolutionNode template, String nodeName,
+			GlobalProperties gProps, LocalProperties lProps,
+			Channel initialInput)
+	{
+		super(template, nodeName, DriverStrategy.NONE);
+		
+		this.globalProps = gProps;
+		this.localProps = lProps;
+		this.initialInput = initialInput;
+		
+		// the partial solution does not cost anything
+		this.nodeCosts = NO_COSTS;
+		this.cumulativeCosts = NO_COSTS;
+		
+		if (initialInput.getSource().branchPlan != null && initialInput.getSource().branchPlan.size() > 0) {
+			if (this.branchPlan == null) {
+				this.branchPlan = new HashMap<OptimizerNode, PlanNode>();
+			}
+			
+			this.branchPlan.putAll(initialInput.getSource().branchPlan);
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	public BulkPartialSolutionNode getPartialSolutionNode() {
+		return (BulkPartialSolutionNode) this.template;
+	}
+	
+	public BulkIterationPlanNode getContainingIterationNode() {
+		return this.containingIterationNode;
+	}
+	
+	public void setContainingIterationNode(BulkIterationPlanNode containingIterationNode) {
+		this.containingIterationNode = containingIterationNode;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public void accept(Visitor<PlanNode> visitor) {
+		if (visitor.preVisit(this)) {
+			visitor.postVisit(this);
+		}
+	}
+
+	@Override
+	public Iterable<PlanNode> getPredecessors() {
+		return Collections.<PlanNode>emptyList();
+	}
+
+	@Override
+	public Iterable<Channel> getInputs() {
+		return Collections.<Channel>emptyList();
+	}
+
+	@Override
+	public SourceAndDamReport hasDamOnPathDownTo(PlanNode source) {
+		if (source == this) {
+			return FOUND_SOURCE;
+		}
+		SourceAndDamReport res = this.initialInput.getSource().hasDamOnPathDownTo(source);
+		if (res == FOUND_SOURCE_AND_DAM) {
+			return FOUND_SOURCE_AND_DAM;
+		}
+		else if (res == FOUND_SOURCE) {
+			return (this.initialInput.getLocalStrategy().dams() || 
+					this.initialInput.getTempMode().breaksPipeline() ||
+					getDriverStrategy().firstDam() == DamBehavior.FULL_DAM) ?
+				FOUND_SOURCE_AND_DAM : FOUND_SOURCE;
+		}
+		else {
+			return NOT_FOUND;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/Channel.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/Channel.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/Channel.java
new file mode 100644
index 0000000..875d1c3
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/Channel.java
@@ -0,0 +1,538 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.plan;
+
+import org.apache.flink.api.common.distributions.DataDistribution;
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.dag.EstimateProvider;
+import org.apache.flink.optimizer.dag.TempMode;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
+import org.apache.flink.optimizer.plandump.DumpableConnection;
+import org.apache.flink.optimizer.util.Utils;
+import org.apache.flink.runtime.io.network.DataExchangeMode;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.runtime.operators.util.LocalStrategy;
+
+/**
+ * A Channel represents the result produced by an operator and the data exchange
+ * before the consumption by the target operator.
+ *
+ * The channel defines and tracks various properties and characteristics of the
+ * data set and data exchange.
+ *
+ * Data set characteristics:
+ * <ul>
+ *     <li>The "global properties" of the data, i.e., how the data is distributed across
+ *         partitions</li>
+ *     <li>The "required global properties" of the data, i.e., the global properties that, if absent,
+ *         would cause the program to return a wrong result.</li>
+ *     <li>The "local properties" of the data, i.e., how the data is organized within a partition</li>
+ *     <li>The "required local properties" of the data, i.e., the local properties that, if absent,
+ *         would cause the program to return a wrong result.</li>
+ * </ul>
+ *
+ * Data exchange parameters:
+ * <ul>
+ *     <li>The "ship strategy", i.e., whether to forward the data, shuffle it, broadcast it, ...</li>
+ *     <li>The "ship keys", which are the positions of the key fields in the exchanged records.</li>
+ *     <li>The "data exchange mode", which defines whether to pipeline or batch the exchange</li>
+ *     <li>Several more...</li>
+ * </ul>
+ */
+public class Channel implements EstimateProvider, Cloneable, DumpableConnection<PlanNode> {
+	
+	private PlanNode source;
+	
+	private PlanNode target;
+
+	private ShipStrategyType shipStrategy = ShipStrategyType.NONE;
+
+	private DataExchangeMode dataExchangeMode;
+	
+	private LocalStrategy localStrategy = LocalStrategy.NONE;
+	
+	private FieldList shipKeys;
+	
+	private FieldList localKeys;
+	
+	private boolean[] shipSortOrder;
+	
+	private boolean[] localSortOrder;
+	
+	private RequestedGlobalProperties requiredGlobalProps;
+	
+	private RequestedLocalProperties requiredLocalProps;
+	
+	private GlobalProperties globalProps;
+	
+	private LocalProperties localProps;
+	
+	private TypeSerializerFactory<?> serializer;
+	
+	private TypeComparatorFactory<?> shipStrategyComparator;
+	
+	private TypeComparatorFactory<?> localStrategyComparator;
+	
+	private DataDistribution dataDistribution;
+	
+	private Partitioner<?> partitioner;
+	
+	private TempMode tempMode;
+	
+	private double relativeTempMemory;
+	
+	private double relativeMemoryLocalStrategy;
+	
+	private int replicationFactor = 1;
+	
+	// --------------------------------------------------------------------------------------------
+	
+	public Channel(PlanNode sourceNode) {
+		this(sourceNode, null);
+	}
+	
+	public Channel(PlanNode sourceNode, TempMode tempMode) {
+		this.source = sourceNode;
+		this.tempMode = (tempMode == null ? TempMode.NONE : tempMode);
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//                                         Accessors
+	// --------------------------------------------------------------------------------------------
+	
+	/**
+	 * Gets the source of this Channel.
+	 *
+	 * @return The source.
+	 */
+	@Override
+	public PlanNode getSource() {
+		return this.source;
+	}
+	
+	/**
+	 * Sets the target of this Channel.
+	 *
+	 * @param target The target.
+	 */
+	public void setTarget(PlanNode target) {
+		this.target = target;
+	}
+	
+	/**
+	 * Gets the target of this Channel.
+	 *
+	 * @return The target.
+	 */
+	public PlanNode getTarget() {
+		return this.target;
+	}
+
+	public void setShipStrategy(ShipStrategyType strategy, DataExchangeMode dataExchangeMode) {
+		setShipStrategy(strategy, null, null, null, dataExchangeMode);
+	}
+	
+	public void setShipStrategy(ShipStrategyType strategy, FieldList keys, DataExchangeMode dataExchangeMode) {
+		setShipStrategy(strategy, keys, null, null, dataExchangeMode);
+	}
+	
+	public void setShipStrategy(ShipStrategyType strategy, FieldList keys,
+								boolean[] sortDirection, DataExchangeMode dataExchangeMode) {
+		setShipStrategy(strategy, keys, sortDirection, null, dataExchangeMode);
+	}
+	
+	public void setShipStrategy(ShipStrategyType strategy, FieldList keys,
+								Partitioner<?> partitioner, DataExchangeMode dataExchangeMode) {
+		setShipStrategy(strategy, keys, null, partitioner, dataExchangeMode);
+	}
+	
+	public void setShipStrategy(ShipStrategyType strategy, FieldList keys,
+								boolean[] sortDirection, Partitioner<?> partitioner,
+								DataExchangeMode dataExchangeMode) {
+		this.shipStrategy = strategy;
+		this.shipKeys = keys;
+		this.shipSortOrder = sortDirection;
+		this.partitioner = partitioner;
+		this.dataExchangeMode = dataExchangeMode;
+		this.globalProps = null;		// reset the global properties
+	}
+
+	/**
+	 * Gets the data exchange mode (batch / streaming) to use for the data
+	 * exchange of this channel.
+	 *
+	 * @return The data exchange mode of this channel.
+	 */
+	public DataExchangeMode getDataExchangeMode() {
+		return dataExchangeMode;
+	}
+
+	public ShipStrategyType getShipStrategy() {
+		return this.shipStrategy;
+	}
+	
+	public FieldList getShipStrategyKeys() {
+		return this.shipKeys;
+	}
+	
+	public boolean[] getShipStrategySortOrder() {
+		return this.shipSortOrder;
+	}
+	
+	public void setLocalStrategy(LocalStrategy strategy) {
+		setLocalStrategy(strategy, null, null);
+	}
+	
+	public void setLocalStrategy(LocalStrategy strategy, FieldList keys, boolean[] sortDirection) {
+		this.localStrategy = strategy;
+		this.localKeys = keys;
+		this.localSortOrder = sortDirection;
+		this.localProps = null;		// reset the local properties
+	}
+	
+	public LocalStrategy getLocalStrategy() {
+		return this.localStrategy;
+	}
+	
+	public FieldList getLocalStrategyKeys() {
+		return this.localKeys;
+	}
+	
+	public boolean[] getLocalStrategySortOrder() {
+		return this.localSortOrder;
+	}
+	
+	public void setDataDistribution(DataDistribution dataDistribution) {
+		this.dataDistribution = dataDistribution;
+	}
+	
+	public DataDistribution getDataDistribution() {
+		return this.dataDistribution;
+	}
+	
+	public Partitioner<?> getPartitioner() {
+		return partitioner;
+	}
+	
+	public TempMode getTempMode() {
+		return this.tempMode;
+	}
+
+	/**
+	 * Sets the temp mode of the connection.
+	 * 
+	 * @param tempMode
+	 *        The temp mode of the connection.
+	 */
+	public void setTempMode(TempMode tempMode) {
+		this.tempMode = tempMode;
+	}
+	
+	/**
+	 * Gets the memory for materializing the channel's result from this Channel.
+	 *
+	 * @return The temp memory.
+	 */
+	public double getRelativeTempMemory() {
+		return this.relativeTempMemory;
+	}
+	
+	/**
+	 * Sets the memory for materializing the channel's result from this Channel.
+	 *
+	 * @param relativeTempMemory The memory for materialization.
+	 */
+	public void setRelativeTempMemory(double relativeTempMemory) {
+		this.relativeTempMemory = relativeTempMemory;
+	}
+	
+	/**
+	 * Sets the replication factor of the connection.
+	 * 
+	 * @param factor The replication factor of the connection.
+	 */
+	public void setReplicationFactor(int factor) {
+		this.replicationFactor = factor;
+	}
+	
+	/**
+	 * Returns the replication factor of the connection.
+	 * 
+	 * @return The replication factor of the connection.
+	 */
+	public int getReplicationFactor() {
+		return this.replicationFactor;
+	}
+	
+	/**
+	 * Gets the serializer from this Channel.
+	 *
+	 * @return The serializer.
+	 */
+	public TypeSerializerFactory<?> getSerializer() {
+		return serializer;
+	}
+	
+	/**
+	 * Sets the serializer for this Channel.
+	 *
+	 * @param serializer The serializer to set.
+	 */
+	public void setSerializer(TypeSerializerFactory<?> serializer) {
+		this.serializer = serializer;
+	}
+	
+	/**
+	 * Gets the ship strategy comparator from this Channel.
+	 *
+	 * @return The ship strategy comparator.
+	 */
+	public TypeComparatorFactory<?> getShipStrategyComparator() {
+		return shipStrategyComparator;
+	}
+	
+	/**
+	 * Sets the ship strategy comparator for this Channel.
+	 *
+	 * @param shipStrategyComparator The ship strategy comparator to set.
+	 */
+	public void setShipStrategyComparator(TypeComparatorFactory<?> shipStrategyComparator) {
+		this.shipStrategyComparator = shipStrategyComparator;
+	}
+	
+	/**
+	 * Gets the local strategy comparator from this Channel.
+	 *
+	 * @return The local strategy comparator.
+	 */
+	public TypeComparatorFactory<?> getLocalStrategyComparator() {
+		return localStrategyComparator;
+	}
+	
+	/**
+	 * Sets the local strategy comparator for this Channel.
+	 *
+	 * @param localStrategyComparator The local strategy comparator to set.
+	 */
+	public void setLocalStrategyComparator(TypeComparatorFactory<?> localStrategyComparator) {
+		this.localStrategyComparator = localStrategyComparator;
+	}
+	
+	public double getRelativeMemoryLocalStrategy() {
+		return relativeMemoryLocalStrategy;
+	}
+	
+	public void setRelativeMemoryLocalStrategy(double relativeMemoryLocalStrategy) {
+		this.relativeMemoryLocalStrategy = relativeMemoryLocalStrategy;
+	}
+	
+	public boolean isOnDynamicPath() {
+		return this.source.isOnDynamicPath();
+	}
+	
+	public int getCostWeight() {
+		return this.source.getCostWeight();
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//                                Statistic Estimates
+	// --------------------------------------------------------------------------------------------
+	
+
+	@Override
+	public long getEstimatedOutputSize() {
+		long estimate = this.source.template.getEstimatedOutputSize();
+		return estimate < 0 ? estimate : estimate * this.replicationFactor;
+	}
+
+	@Override
+	public long getEstimatedNumRecords() {
+		long estimate =  this.source.template.getEstimatedNumRecords();
+		return estimate < 0 ? estimate : estimate * this.replicationFactor;
+	}
+	
+	@Override
+	public float getEstimatedAvgWidthPerOutputRecord() {
+		return this.source.template.getEstimatedAvgWidthPerOutputRecord();
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//                                Data Property Handling
+	// --------------------------------------------------------------------------------------------
+	
+
+	public RequestedGlobalProperties getRequiredGlobalProps() {
+		return requiredGlobalProps;
+	}
+
+	public void setRequiredGlobalProps(RequestedGlobalProperties requiredGlobalProps) {
+		this.requiredGlobalProps = requiredGlobalProps;
+	}
+
+	public RequestedLocalProperties getRequiredLocalProps() {
+		return requiredLocalProps;
+	}
+
+	public void setRequiredLocalProps(RequestedLocalProperties requiredLocalProps) {
+		this.requiredLocalProps = requiredLocalProps;
+	}
+
+	public GlobalProperties getGlobalProperties() {
+		if (this.globalProps == null) {
+			this.globalProps = this.source.getGlobalProperties().clone();
+			switch (this.shipStrategy) {
+				case BROADCAST:
+					this.globalProps.clearUniqueFieldCombinations();
+					this.globalProps.setFullyReplicated();
+					break;
+				case PARTITION_HASH:
+					this.globalProps.setHashPartitioned(this.shipKeys);
+					break;
+				case PARTITION_RANGE:
+					this.globalProps.setRangePartitioned(Utils.createOrdering(this.shipKeys, this.shipSortOrder));
+					break;
+				case FORWARD:
+					break;
+				case PARTITION_RANDOM:
+					this.globalProps.reset();
+					break;
+				case PARTITION_FORCED_REBALANCE:
+					this.globalProps.setForcedRebalanced();
+					break;
+				case PARTITION_CUSTOM:
+					this.globalProps.setCustomPartitioned(this.shipKeys, this.partitioner);
+					break;
+				case NONE:
+					throw new CompilerException("Cannot produce GlobalProperties before ship strategy is set.");
+			}
+		}
+		
+		return this.globalProps;
+	}
+	
+	public LocalProperties getLocalProperties() {
+		if (this.localProps == null) {
+			computeLocalPropertiesAfterShippingOnly();
+			switch (this.localStrategy) {
+				case NONE:
+					break;
+				case SORT:
+				case COMBININGSORT:
+					this.localProps = LocalProperties.forOrdering(Utils.createOrdering(this.localKeys, this.localSortOrder));
+					break;
+				default:
+					throw new CompilerException("Unsupported local strategy for channel.");
+			}
+		}
+		
+		return this.localProps;
+	}
+	
+	private void computeLocalPropertiesAfterShippingOnly() {
+		switch (this.shipStrategy) {
+			case BROADCAST:
+			case PARTITION_HASH:
+			case PARTITION_CUSTOM:
+			case PARTITION_RANGE:
+			case PARTITION_RANDOM:
+			case PARTITION_FORCED_REBALANCE:
+				this.localProps = new LocalProperties();
+				break;
+			case FORWARD:
+				this.localProps = this.source.getLocalProperties();
+				break;
+			case NONE:
+				throw new CompilerException("ShipStrategy has not yet been set.");
+			default:
+				throw new CompilerException("Unknown ShipStrategy.");
+		}
+	}
+	
+	public void adjustGlobalPropertiesForFullParallelismChange() {
+		if (this.shipStrategy == null || this.shipStrategy == ShipStrategyType.NONE) {
+			throw new IllegalStateException("Cannot adjust channel for degree of parallelism " +
+					"change before the ship strategy is set.");
+		}
+		
+		// make sure the properties are acquired
+		if (this.globalProps == null) {
+			getGlobalProperties();
+		}
+		
+		// some strategies globally reestablish properties
+		switch (this.shipStrategy) {
+		case FORWARD:
+			throw new CompilerException("Cannot use FORWARD strategy between operations " +
+					"with different number of parallel instances.");
+		case NONE: // excluded by sanity check. left here for verification check completion
+		case BROADCAST:
+		case PARTITION_HASH:
+		case PARTITION_RANGE:
+		case PARTITION_RANDOM:
+		case PARTITION_FORCED_REBALANCE:
+		case PARTITION_CUSTOM:
+			return;
+		}
+		throw new CompilerException("Unrecognized Ship Strategy Type: " + this.shipStrategy);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	
+	/**
+	 * Utility method used while swapping binary union nodes for n-ary union nodes.
+	 */
+	public void swapUnionNodes(PlanNode newUnionNode) {
+		if (!(this.source instanceof BinaryUnionPlanNode)) {
+			throw new IllegalStateException();
+		} else {
+			this.source = newUnionNode;
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	public int getMaxDepth() {
+		return this.source.getOptimizerNode().getMaxDepth() + 1;
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public String toString() {
+		return "Channel (" + this.source + (this.target == null ? ')' : ") -> (" + this.target + ')') +
+				'[' + this.shipStrategy + "] [" + this.localStrategy + "] " +
+				(this.tempMode == null || this.tempMode == TempMode.NONE ? "{NO-TEMP}" : this.tempMode);
+	}
+
+	@Override
+	public Channel clone() {
+		try {
+			return (Channel) super.clone();
+		} catch (CloneNotSupportedException cnsex) {
+			throw new RuntimeException(cnsex);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/DualInputPlanNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/DualInputPlanNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/DualInputPlanNode.java
new file mode 100644
index 0000000..01c56dd
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/DualInputPlanNode.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.plan;
+
+import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE;
+import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE_AND_DAM;
+import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.NOT_FOUND;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
+import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
+import org.apache.flink.optimizer.dag.OptimizerNode;
+import org.apache.flink.optimizer.dag.TwoInputNode;
+import org.apache.flink.runtime.operators.DamBehavior;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.util.Visitor;
+
+/**
+ *
+ */
+public class DualInputPlanNode extends PlanNode {
+	
+	protected final Channel input1;
+	protected final Channel input2;
+	
+	protected final FieldList keys1;
+	protected final FieldList keys2;
+	
+	protected final boolean[] sortOrders;
+	
+	private TypeComparatorFactory<?> comparator1;
+	private TypeComparatorFactory<?> comparator2;
+	private TypePairComparatorFactory<?, ?> pairComparator;
+	
+	public Object postPassHelper1;
+	public Object postPassHelper2;
+	
+	// --------------------------------------------------------------------------------------------
+
+	public DualInputPlanNode(OptimizerNode template, String nodeName, Channel input1, Channel input2, DriverStrategy diverStrategy) {
+		this(template, nodeName, input1, input2, diverStrategy, null, null, null);
+	}
+	
+	public DualInputPlanNode(OptimizerNode template, String nodeName, Channel input1, Channel input2,
+			DriverStrategy diverStrategy, FieldList driverKeyFields1, FieldList driverKeyFields2)
+	{
+		this(template, nodeName, input1, input2, diverStrategy, driverKeyFields1, driverKeyFields2,
+									SingleInputPlanNode.getTrueArray(driverKeyFields1.size()));
+	}
+	
+	public DualInputPlanNode(OptimizerNode template, String nodeName, Channel input1, Channel input2, DriverStrategy diverStrategy,
+			FieldList driverKeyFields1, FieldList driverKeyFields2, boolean[] driverSortOrders)
+	{
+		super(template, nodeName, diverStrategy);
+		this.input1 = input1;
+		this.input2 = input2;
+		this.keys1 = driverKeyFields1;
+		this.keys2 = driverKeyFields2;
+		this.sortOrders = driverSortOrders;
+		
+		if (this.input1.getShipStrategy() == ShipStrategyType.BROADCAST) {
+			this.input1.setReplicationFactor(getParallelism());
+		}
+		if (this.input2.getShipStrategy() == ShipStrategyType.BROADCAST) {
+			this.input2.setReplicationFactor(getParallelism());
+		}
+		
+		mergeBranchPlanMaps(input1.getSource(), input2.getSource());
+	}
+
+	// --------------------------------------------------------------------------------------------
+	
+	public TwoInputNode getTwoInputNode() {
+		if (this.template instanceof TwoInputNode) {
+			return (TwoInputNode) this.template;
+		} else {
+			throw new RuntimeException();
+		}
+	}
+	
+	public FieldList getKeysForInput1() {
+		return this.keys1;
+	}
+	
+	public FieldList getKeysForInput2() {
+		return this.keys2;
+	}
+	
+	public boolean[] getSortOrders() {
+		return this.sortOrders;
+	}
+	
+	public TypeComparatorFactory<?> getComparator1() {
+		return this.comparator1;
+	}
+	
+	public TypeComparatorFactory<?> getComparator2() {
+		return this.comparator2;
+	}
+	
+	public void setComparator1(TypeComparatorFactory<?> comparator) {
+		this.comparator1 = comparator;
+	}
+	
+	public void setComparator2(TypeComparatorFactory<?> comparator) {
+		this.comparator2 = comparator;
+	}
+	
+	public TypePairComparatorFactory<?, ?> getPairComparator() {
+		return this.pairComparator;
+	}
+	
+	public void setPairComparator(TypePairComparatorFactory<?, ?> comparator) {
+		this.pairComparator = comparator;
+	}
+	
+	/**
+	 * Gets the first input channel to this node.
+	 * 
+	 * @return The first input channel to this node.
+	 */
+	public Channel getInput1() {
+		return this.input1;
+	}
+	
+	/**
+	 * Gets the second input channel to this node.
+	 * 
+	 * @return The second input channel to this node.
+	 */
+	public Channel getInput2() {
+		return this.input2;
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+
+	@Override
+	public void accept(Visitor<PlanNode> visitor) {
+		if (visitor.preVisit(this)) {
+			this.input1.getSource().accept(visitor);
+			this.input2.getSource().accept(visitor);
+			
+			for (Channel broadcastInput : getBroadcastInputs()) {
+				broadcastInput.getSource().accept(visitor);
+			}
+			
+			visitor.postVisit(this);
+		}
+	}
+	
+
+	@Override
+	public Iterable<PlanNode> getPredecessors() {
+		if (getBroadcastInputs() == null || getBroadcastInputs().isEmpty()) {
+			return Arrays.asList(this.input1.getSource(), this.input2.getSource());
+		} else {
+			List<PlanNode> preds = new ArrayList<PlanNode>();
+			
+			preds.add(input1.getSource());
+			preds.add(input2.getSource());
+
+			for (Channel c : getBroadcastInputs()) {
+				preds.add(c.getSource());
+			}
+			
+			return preds;
+		}
+	}
+
+	@Override
+	public Iterable<Channel> getInputs() {
+		return Arrays.asList(this.input1, this.input2);
+	}
+
+
+	@Override
+	public SourceAndDamReport hasDamOnPathDownTo(PlanNode source) {
+		if (source == this) {
+			return FOUND_SOURCE;
+		}
+		
+		// check first input
+		SourceAndDamReport res1 = this.input1.getSource().hasDamOnPathDownTo(source);
+		if (res1 == FOUND_SOURCE_AND_DAM) {
+			return FOUND_SOURCE_AND_DAM;
+		}
+		else if (res1 == FOUND_SOURCE) {
+			if (this.input1.getLocalStrategy().dams() || this.input1.getTempMode().breaksPipeline() ||
+					getDriverStrategy().firstDam() == DamBehavior.FULL_DAM) {
+				return FOUND_SOURCE_AND_DAM;
+			} else {
+				return FOUND_SOURCE;
+			}
+		}
+		else {
+			SourceAndDamReport res2 = this.input2.getSource().hasDamOnPathDownTo(source);
+			if (res2 == FOUND_SOURCE_AND_DAM) {
+				return FOUND_SOURCE_AND_DAM;
+			}
+			else if (res2 == FOUND_SOURCE) {
+				if (this.input2.getLocalStrategy().dams() || this.input2.getTempMode().breaksPipeline() ||
+						getDriverStrategy().secondDam() == DamBehavior.FULL_DAM) {
+					return FOUND_SOURCE_AND_DAM;
+				} else {
+					return FOUND_SOURCE;
+				}
+			}
+			else {
+				// NOT_FOUND
+				// check the broadcast inputs
+				
+				for (NamedChannel nc : getBroadcastInputs()) {
+					SourceAndDamReport bcRes = nc.getSource().hasDamOnPathDownTo(source);
+					if (bcRes != NOT_FOUND) {
+						// broadcast inputs are always dams
+						return FOUND_SOURCE_AND_DAM;
+					}
+				}
+				return NOT_FOUND;
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/FlinkPlan.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/FlinkPlan.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/FlinkPlan.java
new file mode 100644
index 0000000..d146c83
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/FlinkPlan.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.plan;
+
+/**
+ * A common interface for compiled Flink plans for both batch and streaming
+ * processing programs.
+ * 
+ */
+public interface FlinkPlan {
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/IterationPlanNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/IterationPlanNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/IterationPlanNode.java
new file mode 100644
index 0000000..38f76b2
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/IterationPlanNode.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.plan;
+
+import org.apache.flink.optimizer.dag.IterationNode;
+import org.apache.flink.util.Visitor;
+
+/**
+ *
+ */
+public interface IterationPlanNode {
+	
+	void acceptForStepFunction(Visitor<PlanNode> visitor);
+	
+	IterationNode getIterationNode();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/NAryUnionPlanNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/NAryUnionPlanNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/NAryUnionPlanNode.java
new file mode 100644
index 0000000..3650eea
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/NAryUnionPlanNode.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.plan;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.flink.optimizer.costs.Costs;
+import org.apache.flink.optimizer.dag.BinaryUnionNode;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.util.IterableIterator;
+import org.apache.flink.util.Visitor;
+
+/**
+ * A union operation over multiple inputs (2 or more).
+ */
+public class NAryUnionPlanNode extends PlanNode {
+	
+	private final List<Channel> inputs;
+	
+	/**
+	 * @param template
+	 */
+	public NAryUnionPlanNode(BinaryUnionNode template, List<Channel> inputs, GlobalProperties gProps,
+			Costs cumulativeCosts)
+	{
+		super(template, "Union", DriverStrategy.NONE);
+		
+		this.inputs = inputs;
+		this.globalProps = gProps;
+		this.localProps = new LocalProperties();
+		this.nodeCosts = new Costs();
+		this.cumulativeCosts = cumulativeCosts;
+	}
+
+	@Override
+	public void accept(Visitor<PlanNode> visitor) {
+		visitor.preVisit(this);
+		for (Channel c : this.inputs) {
+			c.getSource().accept(visitor);
+		}
+		visitor.postVisit(this);
+	}
+	
+	public List<Channel> getListOfInputs() {
+		return this.inputs;
+	}
+
+	@Override
+	public Iterable<Channel> getInputs() {
+		return Collections.unmodifiableList(this.inputs);
+	}
+
+	@Override
+	public Iterable<PlanNode> getPredecessors() {
+		final Iterator<Channel> channels = this.inputs.iterator();
+		return new IterableIterator<PlanNode>() {
+
+			@Override
+			public boolean hasNext() {
+				return channels.hasNext();
+			}
+
+			@Override
+			public PlanNode next() {
+				return channels.next().getSource();
+			}
+
+			@Override
+			public void remove() {
+				throw new UnsupportedOperationException();
+			}
+			
+			@Override
+			public Iterator<PlanNode> iterator() {
+				return this;
+			}
+		};
+	}
+
+	@Override
+	public SourceAndDamReport hasDamOnPathDownTo(PlanNode source) {
+		// this node is used after the plan enumeration. consequently, this will never be invoked here
+		throw new UnsupportedOperationException();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/NamedChannel.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/NamedChannel.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/NamedChannel.java
new file mode 100644
index 0000000..da97e61
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/NamedChannel.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.plan;
+
+import org.apache.flink.optimizer.dag.TempMode;
+
+public class NamedChannel extends Channel {
+
+	private final String name;
+
+	/**
+	 * Initializes NamedChannel.
+	 * 
+	 * @param sourceNode
+	 */
+	public NamedChannel(String name, PlanNode sourceNode) {
+		super(sourceNode);
+		this.name = name;
+	}
+
+	public NamedChannel(String name, PlanNode sourceNode, TempMode tempMode) {
+		super(sourceNode, tempMode);
+		this.name = name;
+	}
+
+	public String getName() {
+		return this.name;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/OptimizedPlan.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/OptimizedPlan.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/OptimizedPlan.java
new file mode 100644
index 0000000..d56be87
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/OptimizedPlan.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.plan;
+
+import java.util.Collection;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.util.Visitable;
+import org.apache.flink.util.Visitor;
+
+/**
+ * The execution plan generated by the Optimizer. It contains {@link PlanNode}s
+ * and {@link Channel}s that describe exactly how the program should be executed.
+ * It defines all ship strategies (local pipe, shuffle, broadcast, rebalance), all
+ * operator strategies (sorting-merge join, hash join, sorted grouping, ...),
+ * and the data exchange modes (batched, pipelined).
+ */
+public class OptimizedPlan implements FlinkPlan, Visitable<PlanNode>  {
+	
+	/** The data sources in the plan. */
+	private final Collection<SourcePlanNode> dataSources;
+
+	/** The data sinks in the plan. */
+	private final Collection<SinkPlanNode> dataSinks;
+
+	/** All nodes in the optimizer plan. */
+	private final Collection<PlanNode> allNodes;
+	
+	/** The original program. */
+	private final Plan originalProgram;
+
+	/** Name of the job */
+	private final String jobName;
+
+	/**
+	 * Creates a new instance of this optimizer plan container. The plan is given and fully
+	 * described by the data sources, sinks and the collection of all nodes.
+	 * 
+	 * @param sources The data sources.
+	 * @param sinks The data sinks.
+	 * @param allNodes A collection containing all nodes in the plan.
+	 * @param jobName The name of the program
+	 */
+	public OptimizedPlan(Collection<SourcePlanNode> sources, Collection<SinkPlanNode> sinks,
+			Collection<PlanNode> allNodes, String jobName, Plan programPlan)
+	{
+		this.dataSources = sources;
+		this.dataSinks = sinks;
+		this.allNodes = allNodes;
+		this.jobName = jobName;
+		this.originalProgram = programPlan;
+	}
+
+	/**
+	 * Gets the data sources from this OptimizedPlan.
+	 * 
+	 * @return The data sources.
+	 */
+	public Collection<SourcePlanNode> getDataSources() {
+		return dataSources;
+	}
+
+	/**
+	 * Gets the data sinks from this OptimizedPlan.
+	 * 
+	 * @return The data sinks.
+	 */
+	public Collection<SinkPlanNode> getDataSinks() {
+		return dataSinks;
+	}
+
+	/**
+	 * Gets all the nodes from this OptimizedPlan.
+	 * 
+	 * @return All nodes.
+	 */
+	public Collection<PlanNode> getAllNodes() {
+		return allNodes;
+	}
+
+	/**
+	 * Returns the name of the program.
+	 * 
+	 * @return The name of the program.
+	 */
+	public String getJobName() {
+		return this.jobName;
+	}
+	
+	/**
+	 * Gets the original program plan from which this optimized plan was created.
+	 * 
+	 * @return The original program plan.
+	 */
+	public Plan getOriginalPactPlan() {
+		return this.originalProgram;
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Applies the given visitor top down to all nodes, starting at the sinks.
+	 * 
+	 * @param visitor
+	 *        The visitor to apply to the nodes in this plan.
+	 * @see org.apache.flink.util.Visitable#accept(org.apache.flink.util.Visitor)
+	 */
+	@Override
+	public void accept(Visitor<PlanNode> visitor) {
+		for (SinkPlanNode node : this.dataSinks) {
+			node.accept(visitor);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java
new file mode 100644
index 0000000..6f634fb
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.plan;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.costs.Costs;
+import org.apache.flink.optimizer.dag.OptimizerNode;
+import org.apache.flink.optimizer.dag.OptimizerNode.UnclosedBranchDescriptor;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.optimizer.plandump.DumpableConnection;
+import org.apache.flink.optimizer.plandump.DumpableNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.runtime.operators.util.LocalStrategy;
+import org.apache.flink.util.Visitable;
+
+/**
+ * The representation of a data exchange between to operators. The data exchange can realize a shipping strategy, 
+ * which established global properties, and a local strategy, which establishes local properties.
+ * <p>
+ * Because we currently deal only with plans where the operator order is fixed, many properties are equal
+ * among candidates and are determined prior to the enumeration (such as for example constant/dynamic path membership).
+ * Hence, many methods will delegate to the {@code OptimizerNode} that represents the node this candidate was
+ * created for.
+ */
+public abstract class PlanNode implements Visitable<PlanNode>, DumpableNode<PlanNode> {
+	
+	protected final OptimizerNode template;
+	
+	protected final List<Channel> outChannels;
+	
+	private List<NamedChannel> broadcastInputs;
+	
+	private final String nodeName; 
+	
+	private DriverStrategy driverStrategy;	// The local strategy (sorting / hashing, ...)
+	
+	protected LocalProperties localProps; 			// local properties of the data produced by this node
+
+	protected GlobalProperties globalProps;			// global properties of the data produced by this node
+	
+	protected Map<OptimizerNode, PlanNode> branchPlan; // the actual plan alternative chosen at a branch point
+	
+	protected Costs nodeCosts;						// the costs incurred by this node
+
+	protected Costs cumulativeCosts;					// the cumulative costs of all operators in the sub-tree
+	
+	private double relativeMemoryPerSubTask;					// the amount of memory dedicated to each task, in bytes
+	
+	private int parallelism;
+	
+	private boolean pFlag;							// flag for the internal pruning algorithm
+	
+	// --------------------------------------------------------------------------------------------
+	
+	public PlanNode(OptimizerNode template, String nodeName, DriverStrategy strategy) {
+		this.outChannels = new ArrayList<Channel>(2);
+		this.broadcastInputs = new ArrayList<NamedChannel>();
+		this.template = template;
+		this.nodeName = nodeName;
+		this.driverStrategy = strategy;
+		
+		this.parallelism = template.getParallelism();
+
+		// check, if there is branch at this node. if yes, this candidate must be associated with
+		// the branching template node.
+		if (template.isBranching()) {
+			this.branchPlan = new HashMap<OptimizerNode, PlanNode>(6);
+			this.branchPlan.put(template, this);
+		}
+	}
+	
+	protected void mergeBranchPlanMaps(PlanNode pred1, PlanNode pred2) {
+		mergeBranchPlanMaps(pred1.branchPlan, pred2.branchPlan);
+	}
+	
+	protected void mergeBranchPlanMaps(Map<OptimizerNode, PlanNode> branchPlan1, Map<OptimizerNode, PlanNode> branchPlan2) {
+		// merge the branchPlan maps according the template's uncloseBranchesStack
+		if (this.template.hasUnclosedBranches()) {
+			if (this.branchPlan == null) {
+				this.branchPlan = new HashMap<OptimizerNode, PlanNode>(8);
+			}
+	
+			for (UnclosedBranchDescriptor uc : this.template.getOpenBranches()) {
+				OptimizerNode brancher = uc.getBranchingNode();
+				PlanNode selectedCandidate = null;
+	
+				if (branchPlan1 != null) {
+					// predecessor 1 has branching children, see if it got the branch we are looking for
+					selectedCandidate = branchPlan1.get(brancher);
+				}
+				
+				if (selectedCandidate == null && branchPlan2 != null) {
+					// predecessor 2 has branching children, see if it got the branch we are looking for
+					selectedCandidate = branchPlan2.get(brancher);
+				}
+				
+				// it may be that the branch candidate is only found once the broadcast variables are set
+				if (selectedCandidate != null) {
+					this.branchPlan.put(brancher, selectedCandidate);
+				}
+			}
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//                                           Accessors
+	// --------------------------------------------------------------------------------------------
+	
+	/**
+	 * Gets the node from the optimizer DAG for which this plan candidate node was created.
+	 * 
+	 * @return The optimizer's DAG node.
+	 */
+	public OptimizerNode getOriginalOptimizerNode() {
+		return this.template;
+	}
+	
+	/**
+	 * Gets the program operator that this node represents in the plan.
+	 * 
+	 * @return The program operator this node represents in the plan.
+	 */
+	public Operator<?> getProgramOperator() {
+		return this.template.getOperator();
+	}
+	
+	/**
+	 * Gets the name of the plan node.
+	 * 
+	 * @return The name of the plan node.
+	 */
+	public String getNodeName() {
+		return this.nodeName;
+	}
+	
+	public int getMemoryConsumerWeight() {
+		return this.driverStrategy.isMaterializing() ? 1 : 0;
+	}
+	
+	/**
+	 * Gets the memory dedicated to each sub-task for this node.
+	 * 
+	 * @return The memory per task, in bytes.
+	 */
+	public double getRelativeMemoryPerSubTask() {
+		return this.relativeMemoryPerSubTask;
+	}
+
+	/**
+	 * Sets the memory dedicated to each task for this node.
+	 * 
+	 * @param relativeMemoryPerSubtask The relative memory per sub-task
+	 */
+	public void setRelativeMemoryPerSubtask(double relativeMemoryPerSubtask) {
+		this.relativeMemoryPerSubTask = relativeMemoryPerSubtask;
+	}
+	
+	/**
+	 * Gets the driver strategy from this node. This determines for example for a <i>match</i> Pact whether
+	 * to use a merge or a hybrid hash strategy.
+	 * 
+	 * @return The driver strategy.
+	 */
+	public DriverStrategy getDriverStrategy() {
+		return this.driverStrategy;
+	}
+	
+	/**
+	 * Sets the driver strategy for this node. Usually should not be changed.
+	 * 
+	 * @param newDriverStrategy The driver strategy.
+	 */
+	public void setDriverStrategy(DriverStrategy newDriverStrategy) {
+		this.driverStrategy = newDriverStrategy;
+	}
+	
+	public void initProperties(GlobalProperties globals, LocalProperties locals) {
+		if (this.globalProps != null || this.localProps != null) {
+			throw new IllegalStateException();
+		}
+		this.globalProps = globals;
+		this.localProps = locals;
+	}
+	
+	/**
+	 * Gets the local properties from this PlanNode.
+	 *
+	 * @return The local properties.
+	 */
+	public LocalProperties getLocalProperties() {
+		return this.localProps;
+	}
+	
+	/**
+	 * Gets the global properties from this PlanNode.
+	 *
+	 * @return The global properties.
+	 */
+	public GlobalProperties getGlobalProperties() {
+		return this.globalProps;
+	}
+	
+	/**
+	 * Gets the costs incurred by this node. The costs reflect also the costs incurred by the shipping strategies
+	 * of the incoming connections.
+	 * 
+	 * @return The node-costs, or null, if not yet set.
+	 */
+	public Costs getNodeCosts() {
+		return this.nodeCosts;
+	}
+
+	/**
+	 * Gets the cumulative costs of this nose. The cumulative costs are the sum of the costs
+	 * of this node and of all nodes in the subtree below this node.
+	 * 
+	 * @return The cumulative costs, or null, if not yet set.
+	 */
+	public Costs getCumulativeCosts() {
+		return this.cumulativeCosts;
+	}
+
+	public Costs getCumulativeCostsShare() {
+		if (this.cumulativeCosts == null) {
+			return null;
+		} else {
+			Costs result = cumulativeCosts.clone();
+			if (this.template.getOutgoingConnections() != null) {
+				int outDegree = this.template.getOutgoingConnections().size();
+				if (outDegree > 0) {
+					result.divideBy(outDegree);
+				}
+			}
+
+			return result;
+		}
+	}
+
+	
+	/**
+	 * Sets the basic cost for this node to the given value, and sets the cumulative costs
+	 * to those costs plus the cost shares of all inputs (regular and broadcast).
+	 * 
+	 * @param nodeCosts	 The already knows costs for this node
+	 * 						(this cost a produces by a concrete {@code OptimizerNode} subclass.
+	 */
+	public void setCosts(Costs nodeCosts) {
+		// set the node costs
+		this.nodeCosts = nodeCosts;
+		
+		// the cumulative costs are the node costs plus the costs of all inputs
+		this.cumulativeCosts = nodeCosts.clone();
+		
+		// add all the normal inputs
+		for (PlanNode pred : getPredecessors()) {
+			
+			Costs parentCosts = pred.getCumulativeCostsShare();
+			if (parentCosts != null) {
+				this.cumulativeCosts.addCosts(parentCosts);
+			} else {
+				throw new CompilerException("Trying to set the costs of an operator before the predecessor costs are computed.");
+			}
+		}
+		
+		// add all broadcast variable inputs
+		if (this.broadcastInputs != null) {
+			for (NamedChannel nc : this.broadcastInputs) {
+				Costs bcInputCost = nc.getSource().getCumulativeCostsShare();
+				if (bcInputCost != null) {
+					this.cumulativeCosts.addCosts(bcInputCost);
+				} else {
+					throw new CompilerException("Trying to set the costs of an operator before the broadcast input costs are computed.");
+				}
+			}
+		}
+	}
+	
+	public void setParallelism(int parallelism) {
+		this.parallelism = parallelism;
+	}
+	
+	public int getParallelism() {
+		return this.parallelism;
+	}
+	
+	public long getGuaranteedAvailableMemory() {
+		return this.template.getMinimalMemoryAcrossAllSubTasks();
+	}
+
+	public Map<OptimizerNode, PlanNode> getBranchPlan() {
+		return branchPlan;
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//                               Input, Predecessors, Successors
+	// --------------------------------------------------------------------------------------------
+	
+	public abstract Iterable<Channel> getInputs();
+	
+	@Override
+	public abstract Iterable<PlanNode> getPredecessors();
+	
+	/**
+	 * Sets a list of all broadcast inputs attached to this node.
+	 */
+	public void setBroadcastInputs(List<NamedChannel> broadcastInputs) {
+		if (broadcastInputs != null) {
+			this.broadcastInputs = broadcastInputs;
+			
+			// update the branch map
+			for (NamedChannel nc : broadcastInputs) {
+				PlanNode source = nc.getSource();
+				
+				mergeBranchPlanMaps(branchPlan, source.branchPlan);
+			}
+		}
+		
+		// do a sanity check that if we are branching, we have now candidates for each branch point
+		if (this.template.hasUnclosedBranches()) {
+			if (this.branchPlan == null) {
+				throw new CompilerException("Branching and rejoining logic did not find a candidate for the branching point.");
+			}
+	
+			for (UnclosedBranchDescriptor uc : this.template.getOpenBranches()) {
+				OptimizerNode brancher = uc.getBranchingNode();
+				if (this.branchPlan.get(brancher) == null) {
+					throw new CompilerException("Branching and rejoining logic did not find a candidate for the branching point.");
+				}
+			}
+		}
+	}
+	
+	/**
+	 * Gets a list of all broadcast inputs attached to this node.
+	 */
+	public List<NamedChannel> getBroadcastInputs() {
+		return this.broadcastInputs;
+	}
+	
+	/**
+	 * Adds a channel to a successor node to this node.
+	 * 
+	 * @param channel The channel to the successor.
+	 */
+	public void addOutgoingChannel(Channel channel) {
+		this.outChannels.add(channel);
+	}
+	
+	/**
+	 * Gets a list of all outgoing channels leading to successors.
+	 * 
+	 * @return A list of all channels leading to successors.
+	 */
+	public List<Channel> getOutgoingChannels() {
+		return this.outChannels;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//                                Miscellaneous
+	// --------------------------------------------------------------------------------------------
+	
+	public void updatePropertiesWithUniqueSets(Set<FieldSet> uniqueFieldCombinations) {
+		if (uniqueFieldCombinations == null || uniqueFieldCombinations.isEmpty()) {
+			return;
+		}
+		for (FieldSet fields : uniqueFieldCombinations) {
+			this.globalProps.addUniqueFieldCombination(fields);
+			this.localProps = this.localProps.addUniqueFields(fields);
+		}
+	}
+
+	public PlanNode getCandidateAtBranchPoint(OptimizerNode branchPoint) {
+		if (branchPlan == null) {
+			return null;
+		} else {
+			return this.branchPlan.get(branchPoint);
+		}
+	}
+	
+	/**
+	 * Sets the pruning marker to true.
+	 */
+	public void setPruningMarker() {
+		this.pFlag = true;
+	}
+	
+	/**
+	 * Checks whether the pruning marker was set.
+	 * 
+	 * @return True, if the pruning marker was set, false otherwise.
+	 */
+	public boolean isPruneMarkerSet() {
+		return this.pFlag;
+	}
+	
+	public boolean isOnDynamicPath() {
+		return this.template.isOnDynamicPath();
+	}
+	
+	public int getCostWeight() {
+		return this.template.getCostWeight();
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	/**
+	 * Checks whether this node has a dam on the way down to the given source node. This method
+	 * returns either that (a) the source node is not found as a (transitive) child of this node,
+	 * (b) the node is found, but no dam is on the path, or (c) the node is found and a dam is on
+	 * the path.
+	 * 
+	 * @param source The node on the path to which the dam is sought.
+	 * @return The result whether the node is found and whether a dam is on the path.
+	 */
+	public abstract SourceAndDamReport hasDamOnPathDownTo(PlanNode source);
+	
+	public FeedbackPropertiesMeetRequirementsReport checkPartialSolutionPropertiesMet(PlanNode partialSolution, GlobalProperties feedbackGlobal, LocalProperties feedbackLocal) {
+		if (this == partialSolution) {
+			return FeedbackPropertiesMeetRequirementsReport.PENDING;
+		}
+		
+		boolean found = false;
+		boolean allMet = true;
+		boolean allLocallyMet = true;
+		
+		for (Channel input : getInputs()) {
+			FeedbackPropertiesMeetRequirementsReport inputState = input.getSource().checkPartialSolutionPropertiesMet(partialSolution, feedbackGlobal, feedbackLocal);
+			
+			if (inputState == FeedbackPropertiesMeetRequirementsReport.NO_PARTIAL_SOLUTION) {
+				continue;
+			}
+			else if (inputState == FeedbackPropertiesMeetRequirementsReport.MET) {
+				found = true;
+				continue;
+			}
+			else if (inputState == FeedbackPropertiesMeetRequirementsReport.NOT_MET) {
+				return FeedbackPropertiesMeetRequirementsReport.NOT_MET;
+			}
+			else {
+				found = true;
+				
+				// the partial solution was on the path here. check whether the channel requires
+				// certain properties that are met, or whether the channel introduces new properties
+				
+				// if the plan introduces new global properties, then we can stop looking whether
+				// the feedback properties are sufficient to meet the requirements
+				if (input.getShipStrategy() != ShipStrategyType.FORWARD && input.getShipStrategy() != ShipStrategyType.NONE) {
+					continue;
+				}
+				
+				// first check whether this channel requires something that is not met
+				if (input.getRequiredGlobalProps() != null && !input.getRequiredGlobalProps().isMetBy(feedbackGlobal)) {
+					return FeedbackPropertiesMeetRequirementsReport.NOT_MET;
+				}
+				
+				// in general, not everything is met here already
+				allMet = false;
+				
+				// if the plan introduces new local properties, we can stop checking for matching local properties
+				if (inputState != FeedbackPropertiesMeetRequirementsReport.PENDING_LOCAL_MET) {
+					
+					if (input.getLocalStrategy() == LocalStrategy.NONE) {
+						
+						if (input.getRequiredLocalProps() != null && !input.getRequiredLocalProps().isMetBy(feedbackLocal)) {
+							return FeedbackPropertiesMeetRequirementsReport.NOT_MET;
+						}
+						
+						allLocallyMet = false;
+					}
+				}
+			}
+		}
+		
+		if (!found) {
+			return FeedbackPropertiesMeetRequirementsReport.NO_PARTIAL_SOLUTION;
+		} else if (allMet) {
+			return FeedbackPropertiesMeetRequirementsReport.MET;
+		} else if (allLocallyMet) {
+			return FeedbackPropertiesMeetRequirementsReport.PENDING_LOCAL_MET;
+		} else {
+			return FeedbackPropertiesMeetRequirementsReport.PENDING;
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+
+	@Override
+	public String toString() {
+		return this.template.getName() + " \"" + getProgramOperator().getName() + "\" : " + this.driverStrategy +
+				" [[ " + this.globalProps + " ]] [[ " + this.localProps + " ]]";
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public OptimizerNode getOptimizerNode() {
+		return this.template;
+	}
+
+	@Override
+	public PlanNode getPlanNode() {
+		return this;
+	}
+
+	@Override
+	public Iterable<DumpableConnection<PlanNode>> getDumpableInputs() {
+		List<DumpableConnection<PlanNode>> allInputs = new ArrayList<DumpableConnection<PlanNode>>();
+		
+		for (Channel c : getInputs()) {
+			allInputs.add(c);
+		}
+		
+		for (NamedChannel c : getBroadcastInputs()) {
+			allInputs.add(c);
+		}
+		
+		return allInputs;
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	public static enum SourceAndDamReport {
+		NOT_FOUND, FOUND_SOURCE, FOUND_SOURCE_AND_DAM;
+	}
+	
+	
+	
+	public static enum FeedbackPropertiesMeetRequirementsReport {
+		/** Indicates that the path is irrelevant */
+		NO_PARTIAL_SOLUTION,
+		
+		/** Indicates that the question whether the properties are met has been determined pending
+		 * dependent on global and local properties */
+		PENDING,
+		
+		/** Indicates that the question whether the properties are met has been determined pending
+		 * dependent on global properties only */
+		PENDING_LOCAL_MET,
+		
+		/** Indicates that the question whether the properties are met has been determined true */
+		MET,
+		
+		/** Indicates that the question whether the properties are met has been determined false */
+		NOT_MET;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SingleInputPlanNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SingleInputPlanNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SingleInputPlanNode.java
new file mode 100644
index 0000000..b928be7
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SingleInputPlanNode.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.plan;
+
+import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE;
+import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE_AND_DAM;
+import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.NOT_FOUND;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.dag.OptimizerNode;
+import org.apache.flink.optimizer.dag.SingleInputNode;
+import org.apache.flink.runtime.operators.DamBehavior;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.util.Visitor;
+
+/**
+ * 
+ */
+public class SingleInputPlanNode extends PlanNode {
+	
+	protected final Channel input;
+	
+	protected final FieldList[] driverKeys;
+	
+	protected final boolean[][] driverSortOrders;
+	
+	private TypeComparatorFactory<?>[] comparators;
+	
+	public Object postPassHelper;
+	
+	// --------------------------------------------------------------------------------------------
+
+	public SingleInputPlanNode(OptimizerNode template, String nodeName, Channel input, DriverStrategy driverStrategy) {
+		this(template, nodeName, input, driverStrategy, null, null);
+	}
+	
+	public SingleInputPlanNode(OptimizerNode template, String nodeName, Channel input, 
+			DriverStrategy driverStrategy, FieldList driverKeyFields)
+	{
+		this(template, nodeName, input, driverStrategy, driverKeyFields, getTrueArray(driverKeyFields.size()));
+	}
+	
+	public SingleInputPlanNode(OptimizerNode template, String nodeName, Channel input, 
+			DriverStrategy driverStrategy, FieldList driverKeyFields, boolean[] driverSortOrders)
+	{
+		super(template, nodeName, driverStrategy);
+		this.input = input;
+		
+		this.comparators = new TypeComparatorFactory<?>[driverStrategy.getNumRequiredComparators()];
+		this.driverKeys = new FieldList[driverStrategy.getNumRequiredComparators()];
+		this.driverSortOrders = new boolean[driverStrategy.getNumRequiredComparators()][];
+		
+		if(driverStrategy.getNumRequiredComparators() > 0) {
+			this.driverKeys[0] = driverKeyFields;
+			this.driverSortOrders[0] = driverSortOrders;
+		}
+		
+		if (this.input.getShipStrategy() == ShipStrategyType.BROADCAST) {
+			this.input.setReplicationFactor(getParallelism());
+		}
+		
+		final PlanNode predNode = input.getSource();
+		
+		if (predNode.branchPlan != null && !predNode.branchPlan.isEmpty()) {
+			
+			if (this.branchPlan == null) {
+				this.branchPlan = new HashMap<OptimizerNode, PlanNode>();
+			}
+			this.branchPlan.putAll(predNode.branchPlan);
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	
+	public SingleInputNode getSingleInputNode() {
+		if (this.template instanceof SingleInputNode) {
+			return (SingleInputNode) this.template;
+		} else {
+			throw new RuntimeException();
+		}
+	}
+	
+	/**
+	 * Gets the input channel to this node.
+	 * 
+	 * @return The input channel to this node.
+	 */
+	public Channel getInput() {
+		return this.input;
+	}
+	
+	/**
+	 * Gets the predecessor of this node, i.e. the source of the input channel.
+	 * 
+	 * @return The predecessor of this node.
+	 */
+	public PlanNode getPredecessor() {
+		return this.input.getSource();
+	}
+	
+	/**
+	 * Sets the key field indexes for the specified driver comparator.
+	 * 
+	 * @param keys The key field indexes for the specified driver comparator.
+	 * @param id The ID of the driver comparator.
+	 */
+	public void setDriverKeyInfo(FieldList keys, int id) {
+		this.setDriverKeyInfo(keys, getTrueArray(keys.size()), id);
+	}
+	
+	/**
+	 * Sets the key field information for the specified driver comparator.
+	 * 
+	 * @param keys The key field indexes for the specified driver comparator.
+	 * @param sortOrder The key sort order for the specified driver comparator.
+	 * @param id The ID of the driver comparator.
+	 */
+	public void setDriverKeyInfo(FieldList keys, boolean[] sortOrder, int id) {
+		if(id < 0 || id >= driverKeys.length) {
+			throw new CompilerException("Invalid id for driver key information. DriverStrategy requires only "
+											+super.getDriverStrategy().getNumRequiredComparators()+" comparators.");
+		}
+		this.driverKeys[id] = keys;
+		this.driverSortOrders[id] = sortOrder;
+	}
+	
+	/**
+	 * Gets the key field indexes for the specified driver comparator.
+	 * 
+	 * @param id The id of the driver comparator for which the key field indexes are requested.
+	 * @return The key field indexes of the specified driver comparator.
+	 */
+	public FieldList getKeys(int id) {
+		return this.driverKeys[id];
+	}
+	
+	/**
+	 * Gets the sort order for the specified driver comparator.
+	 * 
+	 * @param id The id of the driver comparator for which the sort order is requested.
+	 * @return The sort order of the specified driver comparator.
+	 */
+	public boolean[] getSortOrders(int id) {
+		return driverSortOrders[id];
+	}
+	
+	/**
+	 * Gets the specified comparator from this PlanNode.
+	 * 
+	 * @param id The ID of the requested comparator.
+	 *
+	 * @return The specified comparator.
+	 */
+	public TypeComparatorFactory<?> getComparator(int id) {
+		return comparators[id];
+	}
+	
+	/**
+	 * Sets the specified comparator for this PlanNode.
+	 *
+	 * @param comparator The comparator to set.
+	 * @param id The ID of the comparator to set.
+	 */
+	public void setComparator(TypeComparatorFactory<?> comparator, int id) {
+		this.comparators[id] = comparator;
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+
+	@Override
+	public void accept(Visitor<PlanNode> visitor) {
+		if (visitor.preVisit(this)) {
+			this.input.getSource().accept(visitor);
+			
+			for (Channel broadcastInput : getBroadcastInputs()) {
+				broadcastInput.getSource().accept(visitor);
+			}
+			
+			visitor.postVisit(this);
+		}
+	}
+
+
+	@Override
+	public Iterable<PlanNode> getPredecessors() {
+		if (getBroadcastInputs() == null || getBroadcastInputs().isEmpty()) {
+			return Collections.singleton(this.input.getSource());
+		}
+		else {
+			List<PlanNode> preds = new ArrayList<PlanNode>();
+			preds.add(input.getSource());
+			
+			for (Channel c : getBroadcastInputs()) {
+				preds.add(c.getSource());
+			}
+			
+			return preds;
+		}
+	}
+
+
+	@Override
+	public Iterable<Channel> getInputs() {
+		return Collections.singleton(this.input);
+	}
+
+	@Override
+	public SourceAndDamReport hasDamOnPathDownTo(PlanNode source) {
+		if (source == this) {
+			return FOUND_SOURCE;
+		}
+		SourceAndDamReport res = this.input.getSource().hasDamOnPathDownTo(source);
+		if (res == FOUND_SOURCE_AND_DAM) {
+			return FOUND_SOURCE_AND_DAM;
+		}
+		else if (res == FOUND_SOURCE) {
+			return (this.input.getLocalStrategy().dams() || this.input.getTempMode().breaksPipeline() ||
+					getDriverStrategy().firstDam() == DamBehavior.FULL_DAM) ?
+				FOUND_SOURCE_AND_DAM : FOUND_SOURCE;
+		}
+		else {
+			// NOT_FOUND
+			// check the broadcast inputs
+			
+			for (NamedChannel nc : getBroadcastInputs()) {
+				SourceAndDamReport bcRes = nc.getSource().hasDamOnPathDownTo(source);
+				if (bcRes != NOT_FOUND) {
+					// broadcast inputs are always dams
+					return FOUND_SOURCE_AND_DAM;
+				}
+			}
+			return NOT_FOUND;
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	protected static boolean[] getTrueArray(int length) {
+		final boolean[] a = new boolean[length];
+		for (int i = 0; i < length; i++) {
+			a[i] = true;
+		}
+		return a;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SinkJoinerPlanNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SinkJoinerPlanNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SinkJoinerPlanNode.java
new file mode 100644
index 0000000..451484d
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SinkJoinerPlanNode.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.plan;
+
+import java.util.List;
+
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.costs.Costs;
+import org.apache.flink.optimizer.dag.SinkJoiner;
+import org.apache.flink.runtime.operators.DriverStrategy;
+
+
+/**
+ *
+ */
+public class SinkJoinerPlanNode extends DualInputPlanNode {
+	
+	public SinkJoinerPlanNode(SinkJoiner template, Channel input1, Channel input2) {
+		super(template, "", input1, input2, DriverStrategy.BINARY_NO_OP);
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	public void setCosts(Costs nodeCosts) {
+		// the plan enumeration logic works as for regular two-input-operators, which is important
+		// because of the branch handling logic. it does pick redistributing network channels
+		// between the sink and the sink joiner, because sinks joiner has a different DOP than the sink.
+		// we discard any cost and simply use the sum of the costs from the two children.
+		
+		Costs totalCosts = getInput1().getSource().getCumulativeCosts().clone();
+		totalCosts.addCosts(getInput2().getSource().getCumulativeCosts());
+		super.setCosts(totalCosts);
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	public void getDataSinks(List<SinkPlanNode> sinks) {
+		final PlanNode in1 = this.input1.getSource();
+		final PlanNode in2 = this.input2.getSource();
+		
+		if (in1 instanceof SinkPlanNode) {
+			sinks.add((SinkPlanNode) in1);
+		} else if (in1 instanceof SinkJoinerPlanNode) {
+			((SinkJoinerPlanNode) in1).getDataSinks(sinks);
+		} else {
+			throw new CompilerException("Illegal child node for a sink joiner utility node: Neither Sink nor Sink Joiner");
+		}
+		
+		if (in2 instanceof SinkPlanNode) {
+			sinks.add((SinkPlanNode) in2);
+		} else if (in2 instanceof SinkJoinerPlanNode) {
+			((SinkJoinerPlanNode) in2).getDataSinks(sinks);
+		} else {
+			throw new CompilerException("Illegal child node for a sink joiner utility node: Neither Sink nor Sink Joiner");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SinkPlanNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SinkPlanNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SinkPlanNode.java
new file mode 100644
index 0000000..656e67f
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SinkPlanNode.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.plan;
+
+import org.apache.flink.optimizer.dag.DataSinkNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+
+/**
+ * Plan candidate node for data flow sinks.
+ */
+public class SinkPlanNode extends SingleInputPlanNode
+{
+	/**
+	 * Constructs a new sink candidate node that uses <i>NONE</i> as its local strategy. Note that
+	 * local sorting and range partitioning are handled by the incoming channel already.
+	 * 
+	 * @param template The template optimizer node that this candidate is created for.
+	 */
+	public SinkPlanNode(DataSinkNode template, String nodeName, Channel input) {
+		super(template, nodeName, input, DriverStrategy.NONE);
+		
+		this.globalProps = input.getGlobalProperties().clone();
+		this.localProps = input.getLocalProperties().clone();
+	}
+	
+	public DataSinkNode getSinkNode() {
+		if (this.template instanceof DataSinkNode) {
+			return (DataSinkNode) this.template;
+		} else {
+			throw new RuntimeException();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SolutionSetPlanNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SolutionSetPlanNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SolutionSetPlanNode.java
new file mode 100644
index 0000000..63093dd
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SolutionSetPlanNode.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.plan;
+
+import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE;
+import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE_AND_DAM;
+import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.NOT_FOUND;
+
+import java.util.Collections;
+import java.util.HashMap;
+
+import org.apache.flink.optimizer.costs.Costs;
+import org.apache.flink.optimizer.dag.OptimizerNode;
+import org.apache.flink.optimizer.dag.SolutionSetNode;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.util.Visitor;
+
+/**
+ * Plan candidate node for partial solution of a bulk iteration.
+ */
+public class SolutionSetPlanNode extends PlanNode {
+	
+	private static final Costs NO_COSTS = new Costs();
+	
+	private WorksetIterationPlanNode containingIterationNode;
+	
+	private final Channel initialInput;
+	
+	public Object postPassHelper;
+	
+	
+	public SolutionSetPlanNode(SolutionSetNode template, String nodeName,
+			GlobalProperties gProps, LocalProperties lProps,
+			Channel initialInput)
+	{
+		super(template, nodeName, DriverStrategy.NONE);
+		
+		this.globalProps = gProps;
+		this.localProps = lProps;
+		this.initialInput = initialInput;
+		
+		// the node incurs no cost
+		this.nodeCosts = NO_COSTS;
+		this.cumulativeCosts = NO_COSTS;
+		
+		if (initialInput.getSource().branchPlan != null && initialInput.getSource().branchPlan.size() > 0) {
+			if (this.branchPlan == null) {
+				this.branchPlan = new HashMap<OptimizerNode, PlanNode>();
+			}
+			
+			this.branchPlan.putAll(initialInput.getSource().branchPlan);
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	public SolutionSetNode getSolutionSetNode() {
+		return (SolutionSetNode) this.template;
+	}
+	
+	public WorksetIterationPlanNode getContainingIterationNode() {
+		return this.containingIterationNode;
+	}
+	
+	public void setContainingIterationNode(WorksetIterationPlanNode containingIterationNode) {
+		this.containingIterationNode = containingIterationNode;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	
+
+	@Override
+	public void accept(Visitor<PlanNode> visitor) {
+		if (visitor.preVisit(this)) {
+			visitor.postVisit(this);
+		}
+	}
+
+
+	@Override
+	public Iterable<PlanNode> getPredecessors() {
+		return Collections.<PlanNode>emptyList();
+	}
+
+
+	@Override
+	public Iterable<Channel> getInputs() {
+		return Collections.<Channel>emptyList();
+	}
+
+
+	@Override
+	public SourceAndDamReport hasDamOnPathDownTo(PlanNode source) {
+		if (source == this) {
+			return FOUND_SOURCE_AND_DAM;
+		}
+		
+		SourceAndDamReport res = this.initialInput.getSource().hasDamOnPathDownTo(source);
+		if (res == FOUND_SOURCE_AND_DAM || res == FOUND_SOURCE) {
+			return FOUND_SOURCE_AND_DAM;
+		} else {
+			return NOT_FOUND;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SourcePlanNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SourcePlanNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SourcePlanNode.java
new file mode 100644
index 0000000..11b7cc9
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SourcePlanNode.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.plan;
+
+import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE;
+import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.NOT_FOUND;
+
+import java.util.Collections;
+
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.optimizer.dag.DataSourceNode;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.util.Visitor;
+
+/**
+ * Plan candidate node for data flow sources that have no input and no special strategies.
+ */
+public class SourcePlanNode extends PlanNode {
+	
+	private TypeSerializerFactory<?> serializer;
+	
+	/**
+	 * Constructs a new source candidate node that uses <i>NONE</i> as its local strategy.
+	 * 
+	 * @param template The template optimizer node that this candidate is created for.
+	 */
+	public SourcePlanNode(DataSourceNode template, String nodeName) {
+		this(template, nodeName, new GlobalProperties(), new LocalProperties());
+	}
+
+	public SourcePlanNode(DataSourceNode template, String nodeName, GlobalProperties gprops, LocalProperties lprops) {
+		super(template, nodeName, DriverStrategy.NONE);
+
+		this.globalProps = gprops;
+		this.localProps = lprops;
+		updatePropertiesWithUniqueSets(template.getUniqueFields());
+	}
+
+	// --------------------------------------------------------------------------------------------
+	
+	public DataSourceNode getDataSourceNode() {
+		return (DataSourceNode) this.template;
+	}
+	
+	/**
+	 * Gets the serializer from this PlanNode.
+	 *
+	 * @return The serializer.
+	 */
+	public TypeSerializerFactory<?> getSerializer() {
+		return serializer;
+	}
+	
+	/**
+	 * Sets the serializer for this PlanNode.
+	 *
+	 * @param serializer The serializer to set.
+	 */
+	public void setSerializer(TypeSerializerFactory<?> serializer) {
+		this.serializer = serializer;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	
+
+	@Override
+	public void accept(Visitor<PlanNode> visitor) {
+		if (visitor.preVisit(this)) {
+			visitor.postVisit(this);
+		}
+	}
+
+
+	@Override
+	public Iterable<PlanNode> getPredecessors() {
+		return Collections.<PlanNode>emptyList();
+	}
+
+
+	@Override
+	public Iterable<Channel> getInputs() {
+		return Collections.<Channel>emptyList();
+	}
+
+
+	@Override
+	public SourceAndDamReport hasDamOnPathDownTo(PlanNode source) {
+		if (source == this) {
+			return FOUND_SOURCE;
+		} else {
+			return NOT_FOUND;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/StreamingPlan.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/StreamingPlan.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/StreamingPlan.java
new file mode 100644
index 0000000..880f2e3
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/StreamingPlan.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.plan;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+/**
+ * Abstract class representing Flink Streaming plans
+ * 
+ */
+public abstract class StreamingPlan implements FlinkPlan {
+
+	public abstract JobGraph getJobGraph();
+
+	public abstract String getStreamingPlanAsJSON();
+
+	public abstract void dumpStreamingPlanAsJSON(File file) throws IOException;
+
+}