You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/20 11:07:31 UTC

[52/53] [abbrv] flink git commit: [FLINK-441] [optimizer] Rename Pact* and Nephele* classes

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/PactCompiler.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/PactCompiler.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/PactCompiler.java
deleted file mode 100644
index d77fe1e..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/PactCompiler.java
+++ /dev/null
@@ -1,1372 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer;
-
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Deque;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.flink.api.common.operators.base.GroupCombineOperatorBase;
-import org.apache.flink.optimizer.dag.GroupCombineNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.flink.api.common.ExecutionMode;
-import org.apache.flink.api.common.operators.base.SortPartitionOperatorBase;
-import org.apache.flink.optimizer.dag.SortPartitionNode;
-import org.apache.flink.api.common.InvalidProgramException;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.operators.GenericDataSinkBase;
-import org.apache.flink.api.common.operators.GenericDataSourceBase;
-import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.common.operators.Union;
-import org.apache.flink.api.common.operators.base.BulkIterationBase;
-import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
-import org.apache.flink.api.common.operators.base.CrossOperatorBase;
-import org.apache.flink.api.common.operators.base.DeltaIterationBase;
-import org.apache.flink.api.common.operators.base.FilterOperatorBase;
-import org.apache.flink.api.common.operators.base.FlatMapOperatorBase;
-import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
-import org.apache.flink.api.common.operators.base.JoinOperatorBase;
-import org.apache.flink.api.common.operators.base.MapOperatorBase;
-import org.apache.flink.api.common.operators.base.MapPartitionOperatorBase;
-import org.apache.flink.api.common.operators.base.PartitionOperatorBase;
-import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
-import org.apache.flink.api.common.operators.base.BulkIterationBase.PartialSolutionPlaceHolder;
-import org.apache.flink.api.common.operators.base.DeltaIterationBase.SolutionSetPlaceHolder;
-import org.apache.flink.api.common.operators.base.DeltaIterationBase.WorksetPlaceHolder;
-import org.apache.flink.optimizer.costs.CostEstimator;
-import org.apache.flink.optimizer.costs.DefaultCostEstimator;
-import org.apache.flink.optimizer.dag.BinaryUnionNode;
-import org.apache.flink.optimizer.dag.BulkIterationNode;
-import org.apache.flink.optimizer.dag.BulkPartialSolutionNode;
-import org.apache.flink.optimizer.dag.CoGroupNode;
-import org.apache.flink.optimizer.dag.CollectorMapNode;
-import org.apache.flink.optimizer.dag.CrossNode;
-import org.apache.flink.optimizer.dag.DataSinkNode;
-import org.apache.flink.optimizer.dag.DataSourceNode;
-import org.apache.flink.optimizer.dag.FilterNode;
-import org.apache.flink.optimizer.dag.FlatMapNode;
-import org.apache.flink.optimizer.dag.GroupReduceNode;
-import org.apache.flink.optimizer.dag.IterationNode;
-import org.apache.flink.optimizer.dag.MapNode;
-import org.apache.flink.optimizer.dag.MapPartitionNode;
-import org.apache.flink.optimizer.dag.JoinNode;
-import org.apache.flink.optimizer.dag.OptimizerNode;
-import org.apache.flink.optimizer.dag.PactConnection;
-import org.apache.flink.optimizer.dag.PartitionNode;
-import org.apache.flink.optimizer.dag.ReduceNode;
-import org.apache.flink.optimizer.dag.SinkJoiner;
-import org.apache.flink.optimizer.dag.SolutionSetNode;
-import org.apache.flink.optimizer.dag.TempMode;
-import org.apache.flink.optimizer.dag.WorksetIterationNode;
-import org.apache.flink.optimizer.dag.WorksetNode;
-import org.apache.flink.optimizer.deadlockdetect.DeadlockPreventer;
-import org.apache.flink.optimizer.plan.BinaryUnionPlanNode;
-import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
-import org.apache.flink.optimizer.plan.BulkPartialSolutionPlanNode;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.IterationPlanNode;
-import org.apache.flink.optimizer.plan.NAryUnionPlanNode;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.PlanNode;
-import org.apache.flink.optimizer.plan.SinkJoinerPlanNode;
-import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.plan.SolutionSetPlanNode;
-import org.apache.flink.optimizer.plan.SourcePlanNode;
-import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
-import org.apache.flink.optimizer.plan.WorksetPlanNode;
-import org.apache.flink.optimizer.postpass.OptimizerPostPass;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.Visitor;
-
-/**
- * The optimizer that takes the user specified program plan and creates an optimized plan that contains
- * exact descriptions about how the physical execution will take place. It first translates the user
- * program into an internal optimizer representation and then chooses between different alternatives
- * for shipping strategies and local strategies.
- * <p>
- * The basic principle is taken from optimizer works in systems such as Volcano/Cascades and Selinger/System-R/DB2. The
- * optimizer walks from the sinks down, generating interesting properties, and ascends from the sources generating
- * alternative plans, pruning against the interesting properties.
- * <p>
- * The optimizer also assigns the memory to the individual tasks. This is currently done in a very simple fashion: All
- * sub-tasks that need memory (e.g. reduce or join) are given an equal share of memory.
- */
-public class PactCompiler {
-
-	// ------------------------------------------------------------------------
-	// Constants
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Compiler hint key for the input channel's shipping strategy. This String is a key to the operator's stub
-	 * parameters. The corresponding value tells the compiler which shipping strategy to use for the input channel.
-	 * If the operator has two input channels, the shipping strategy is applied to both input channels.
-	 */
-	public static final String HINT_SHIP_STRATEGY = "INPUT_SHIP_STRATEGY";
-
-	/**
-	 * Compiler hint key for the <b>first</b> input channel's shipping strategy. This String is a key to
-	 * the operator's stub parameters. The corresponding value tells the compiler which shipping strategy
-	 * to use for the <b>first</b> input channel. Only applicable to operators with two inputs.
-	 */
-	public static final String HINT_SHIP_STRATEGY_FIRST_INPUT = "INPUT_LEFT_SHIP_STRATEGY";
-
-	/**
-	 * Compiler hint key for the <b>second</b> input channel's shipping strategy. This String is a key to
-	 * the operator's stub parameters. The corresponding value tells the compiler which shipping strategy
-	 * to use for the <b>second</b> input channel. Only applicable to operators with two inputs.
-	 */
-	public static final String HINT_SHIP_STRATEGY_SECOND_INPUT = "INPUT_RIGHT_SHIP_STRATEGY";
-
-	/**
-	 * Value for the shipping strategy compiler hint that enforces a <b>Forward</b> strategy on the
-	 * input channel, i.e. no redistribution of any kind.
-	 * 
-	 * @see #HINT_SHIP_STRATEGY
-	 * @see #HINT_SHIP_STRATEGY_FIRST_INPUT
-	 * @see #HINT_SHIP_STRATEGY_SECOND_INPUT
-	 */
-	public static final String HINT_SHIP_STRATEGY_FORWARD = "SHIP_FORWARD";
-	
-	/**
-	 * Value for the shipping strategy compiler hint that enforces a random repartition strategy.
-	 * 
-	 * @see #HINT_SHIP_STRATEGY
-	 * @see #HINT_SHIP_STRATEGY_FIRST_INPUT
-	 * @see #HINT_SHIP_STRATEGY_SECOND_INPUT
-	 */
-	public static final String HINT_SHIP_STRATEGY_REPARTITION= "SHIP_REPARTITION";
-	
-	/**
-	 * Value for the shipping strategy compiler hint that enforces a hash-partition strategy.
-	 * 
-	 * @see #HINT_SHIP_STRATEGY
-	 * @see #HINT_SHIP_STRATEGY_FIRST_INPUT
-	 * @see #HINT_SHIP_STRATEGY_SECOND_INPUT
-	 */
-	public static final String HINT_SHIP_STRATEGY_REPARTITION_HASH = "SHIP_REPARTITION_HASH";
-	
-	/**
-	 * Value for the shipping strategy compiler hint that enforces a range-partition strategy.
-	 * 
-	 * @see #HINT_SHIP_STRATEGY
-	 * @see #HINT_SHIP_STRATEGY_FIRST_INPUT
-	 * @see #HINT_SHIP_STRATEGY_SECOND_INPUT
-	 */
-	public static final String HINT_SHIP_STRATEGY_REPARTITION_RANGE = "SHIP_REPARTITION_RANGE";
-
-	/**
-	 * Value for the shipping strategy compiler hint that enforces a <b>broadcast</b> strategy on the
-	 * input channel.
-	 * 
-	 * @see #HINT_SHIP_STRATEGY
-	 * @see #HINT_SHIP_STRATEGY_FIRST_INPUT
-	 * @see #HINT_SHIP_STRATEGY_SECOND_INPUT
-	 */
-	public static final String HINT_SHIP_STRATEGY_BROADCAST = "SHIP_BROADCAST";
-
-	/**
-	 * Compiler hint key for the operator's local strategy. This String is a key to the operator's stub
-	 * parameters. The corresponding value tells the compiler which local strategy to use to process the
-	 * data inside one partition.
-	 * <p>
-	 * This hint is ignored by operators that do not have a local strategy (such as <i>Map</i>), or by operators that
-	 * have no choice in their local strategy (such as <i>Cross</i>).
-	 */
-	public static final String HINT_LOCAL_STRATEGY = "LOCAL_STRATEGY";
-
-	/**
-	 * Value for the local strategy compiler hint that enforces a <b>sort based</b> local strategy.
-	 * For example, a <i>Reduce</i> operator will sort the data to group it.
-	 * 
-	 * @see #HINT_LOCAL_STRATEGY
-	 */
-	public static final String HINT_LOCAL_STRATEGY_SORT = "LOCAL_STRATEGY_SORT";
-	
-	/**
-	 * Value for the local strategy compiler hint that enforces a <b>sort based</b> local strategy.
-	 * During sorting a combine method is repeatedly applied to reduce the data volume.
-	 * For example, a <i>Reduce</i> operator will sort the data to group it.
-	 * 
-	 * @see #HINT_LOCAL_STRATEGY
-	 */
-	public static final String HINT_LOCAL_STRATEGY_COMBINING_SORT = "LOCAL_STRATEGY_COMBINING_SORT";
-	
-	/**
-	 * Value for the local strategy compiler hint that enforces a <b>sort merge based</b> local strategy on both
-	 * inputs with subsequent merging of inputs. 
-	 * For example, a <i>Match</i> or <i>CoGroup</i> operator will use a sort-merge strategy to find pairs 
-	 * of matching keys.
-	 * 
-	 * @see #HINT_LOCAL_STRATEGY
-	 */
-	public static final String HINT_LOCAL_STRATEGY_SORT_BOTH_MERGE = "LOCAL_STRATEGY_SORT_BOTH_MERGE";
-	
-	/**
-	 * Value for the local strategy compiler hint that enforces a <b>sort merge based</b> local strategy.
-	 * The first input is sorted, the second input is assumed to be sorted. After sorting both inputs are merged.
-	 * For example, a <i>Match</i> or <i>CoGroup</i> operator will use a sort-merge strategy to find pairs 
-	 * of matching keys.
-	 * 
-	 * @see #HINT_LOCAL_STRATEGY
-	 */
-	public static final String HINT_LOCAL_STRATEGY_SORT_FIRST_MERGE = "LOCAL_STRATEGY_SORT_FIRST_MERGE";
-	
-	/**
-	 * Value for the local strategy compiler hint that enforces a <b>sort merge based</b> local strategy.
-	 * The second input is sorted, the first input is assumed to be sorted. After sorting both inputs are merged.
-	 * For example, a <i>Match</i> or <i>CoGroup</i> operator will use a sort-merge strategy to find pairs 
-	 * of matching keys.
-	 * 
-	 * @see #HINT_LOCAL_STRATEGY
-	 */
-	public static final String HINT_LOCAL_STRATEGY_SORT_SECOND_MERGE = "LOCAL_STRATEGY_SORT_SECOND_MERGE";
-	
-	/**
-	 * Value for the local strategy compiler hint that enforces a <b>merge based</b> local strategy.
-	 * Both inputs are assumed to be sorted and are merged. 
-	 * For example, a <i>Match</i> or <i>CoGroup</i> operator will use a merge strategy to find pairs 
-	 * of matching keys.
-	 * 
-	 * @see #HINT_LOCAL_STRATEGY
-	 */
-	public static final String HINT_LOCAL_STRATEGY_MERGE = "LOCAL_STRATEGY_MERGE";
-
-	
-	/**
-	 * Value for the local strategy compiler hint that enforces a <b>hash based</b> local strategy.
-	 * For example, a <i>Match</i> operator will use a hybrid-hash-join strategy to find pairs of
-	 * matching keys. The <b>first</b> input will be used to build the hash table, the second input will be
-	 * used to probe the table.
-	 * 
-	 * @see #HINT_LOCAL_STRATEGY
-	 */
-	public static final String HINT_LOCAL_STRATEGY_HASH_BUILD_FIRST = "LOCAL_STRATEGY_HASH_BUILD_FIRST";
-
-	/**
-	 * Value for the local strategy compiler hint that enforces a <b>hash based</b> local strategy.
-	 * For example, a <i>Match</i> operator will use a hybrid-hash-join strategy to find pairs of
-	 * matching keys. The <b>second</b> input will be used to build the hash table, the first input will be
-	 * used to probe the table.
-	 * 
-	 * @see #HINT_LOCAL_STRATEGY
-	 */
-	public static final String HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND = "LOCAL_STRATEGY_HASH_BUILD_SECOND";
-
-	/**
-	 * Value for the local strategy compiler hint that chooses the outer side of the <b>nested-loop</b> local strategy.
-	 * A <i>Cross</i> operator will process the data of the <b>first</b> input in the outer-loop of the nested loops.
-	 * Hence, the data of the first input will be is streamed though, while the data of the second input is stored on
-	 * disk
-	 * and repeatedly read.
-	 * 
-	 * @see #HINT_LOCAL_STRATEGY
-	 */
-	public static final String HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_FIRST = "LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_FIRST";
-
-	/**
-	 * Value for the local strategy compiler hint that chooses the outer side of the <b>nested-loop</b> local strategy.
-	 * A <i>Cross</i> operator will process the data of the <b>second</b> input in the outer-loop of the nested loops.
-	 * Hence, the data of the second input will be is streamed though, while the data of the first input is stored on
-	 * disk
-	 * and repeatedly read.
-	 * 
-	 * @see #HINT_LOCAL_STRATEGY
-	 */
-	public static final String HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_SECOND = "LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_SECOND";
-
-	/**
-	 * Value for the local strategy compiler hint that chooses the outer side of the <b>nested-loop</b> local strategy.
-	 * A <i>Cross</i> operator will process the data of the <b>first</b> input in the outer-loop of the nested loops.
-	 * Further more, the first input, being the outer side, will be processed in blocks, and for each block, the second
-	 * input,
-	 * being the inner side, will read repeatedly from disk.
-	 * 
-	 * @see #HINT_LOCAL_STRATEGY
-	 */
-	public static final String HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_FIRST = "LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_FIRST";
-
-	/**
-	 * Value for the local strategy compiler hint that chooses the outer side of the <b>nested-loop</b> local strategy.
-	 * A <i>Cross</i> operator will process the data of the <b>second</b> input in the outer-loop of the nested loops.
-	 * Further more, the second input, being the outer side, will be processed in blocks, and for each block, the first
-	 * input,
-	 * being the inner side, will read repeatedly from disk.
-	 * 
-	 * @see #HINT_LOCAL_STRATEGY
-	 */
-	public static final String HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_SECOND = "LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_SECOND";
-	
-	/**
-	 * The log handle that is used by the compiler to log messages.
-	 */
-	public static final Logger LOG = LoggerFactory.getLogger(PactCompiler.class);
-
-	// ------------------------------------------------------------------------
-	// Members
-	// ------------------------------------------------------------------------
-
-	/**
-	 * The statistics object used to obtain statistics, such as input sizes,
-	 * for the cost estimation process.
-	 */
-	private final DataStatistics statistics;
-
-	/**
-	 * The cost estimator used by the compiler.
-	 */
-	private final CostEstimator costEstimator;
-
-	/**
-	 * The default degree of parallelism for jobs compiled by this compiler.
-	 */
-	private int defaultDegreeOfParallelism;
-
-
-	// ------------------------------------------------------------------------
-	// Constructor & Setup
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Creates a new optimizer instance. The optimizer has no access to statistics about the
-	 * inputs and can hence not determine any properties. It will perform all optimization with
-	 * unknown sizes and hence use only the heuristic cost functions, which result in the selection
-	 * of the most robust execution strategies.
-	 */
-	public PactCompiler() {
-		this(null, new DefaultCostEstimator());
-	}
-
-	/**
-	 * Creates a new optimizer instance that uses the statistics object to determine properties about the input.
-	 * Given those statistics, the optimizer can make better choices for the execution strategies.
-	 * 
-	 * @param stats
-	 *        The statistics to be used to determine the input properties.
-	 */
-	public PactCompiler(DataStatistics stats) {
-		this(stats, new DefaultCostEstimator());
-	}
-
-	/**
-	 * Creates a new optimizer instance. The optimizer has no access to statistics about the
-	 * inputs and can hence not determine any properties. It will perform all optimization with
-	 * unknown sizes and hence use only the heuristic cost functions, which result in the selection
-	 * of the most robust execution strategies.
-	 *
-	 * The optimizer uses the given cost estimator to compute the costs of the individual operations.
-	 * 
-	 * @param estimator The cost estimator to use to cost the individual operations.
-	 */
-	public PactCompiler(CostEstimator estimator) {
-		this(null, estimator);
-	}
-
-	/**
-	 * Creates a new optimizer instance that uses the statistics object to determine properties about the input.
-	 * Given those statistics, the optimizer can make better choices for the execution strategies.
-	 *
-	 * The optimizer uses the given cost estimator to compute the costs of the individual operations.
-	 * 
-	 * @param stats
-	 *        The statistics to be used to determine the input properties.
-	 * @param estimator
-	 *        The <tt>CostEstimator</tt> to use to cost the individual operations.
-	 */
-	public PactCompiler(DataStatistics stats, CostEstimator estimator) {
-		this.statistics = stats;
-		this.costEstimator = estimator;
-
-		// determine the default parallelism
-		this.defaultDegreeOfParallelism = GlobalConfiguration.getInteger(
-				ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE_KEY,
-				ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE);
-		
-		if (defaultDegreeOfParallelism < 1) {
-			LOG.warn("Config value " + defaultDegreeOfParallelism + " for option "
-					+ ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE + " is invalid. Ignoring and using a value of 1.");
-			this.defaultDegreeOfParallelism = 1;
-		}
-	}
-	
-	// ------------------------------------------------------------------------
-	//                             Getters / Setters
-	// ------------------------------------------------------------------------
-	
-	public int getDefaultDegreeOfParallelism() {
-		return defaultDegreeOfParallelism;
-	}
-	
-	public void setDefaultDegreeOfParallelism(int defaultDegreeOfParallelism) {
-		if (defaultDegreeOfParallelism > 0) {
-			this.defaultDegreeOfParallelism = defaultDegreeOfParallelism;
-		} else {
-			throw new IllegalArgumentException("Default parallelism cannot be zero or negative.");
-		}
-	}
-	
-	// ------------------------------------------------------------------------
-	//                               Compilation
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Translates the given program to an OptimizedPlan, where all nodes have their local strategy assigned
-	 * and all channels have a shipping strategy assigned.
-	 *
-	 * For more details on the optimization phase, see the comments for
-	 * {@link #compile(org.apache.flink.api.common.Plan, org.apache.flink.optimizer.postpass.OptimizerPostPass)}.
-	 * 
-	 * @param program The program to be translated.
-	 * @return The optimized plan.
-	 *
-	 * @throws CompilerException
-	 *         Thrown, if the plan is invalid or the optimizer encountered an inconsistent
-	 *         situation during the compilation process.
-	 */
-	public OptimizedPlan compile(Plan program) throws CompilerException {
-		final OptimizerPostPass postPasser = getPostPassFromPlan(program);
-		return compile(program, postPasser);
-	}
-
-	/**
-	 * Translates the given program to an OptimizedPlan. The optimized plan describes for each operator
-	 * which strategy to use (such as hash join versus sort-merge join), what data exchange method to use
-	 * (local pipe forward, shuffle, broadcast), what exchange mode to use (pipelined, batch),
-	 * where to cache intermediate results, etc,
-	 *
-	 * The optimization happens in multiple phases:
-	 * <ol>
-	 *     <li>Create optimizer dag implementation of the program.
-	 *
-	 *     <tt>OptimizerNode</tt> representations of the PACTs, assign parallelism and compute size estimates.</li>
-	 * <li>Compute interesting properties and auxiliary structures.</li>
-	 * <li>Enumerate plan alternatives. This cannot be done in the same step as the interesting property computation (as
-	 * opposed to the Database approaches), because we support plans that are not trees.</li>
-	 * </ol>
-	 * 
-	 * @param program The program to be translated.
-	 * @param postPasser The function to be used for post passing the optimizer's plan and setting the
-	 *                   data type specific serialization routines.
-	 * @return The optimized plan.
-	 * 
-	 * @throws CompilerException
-	 *         Thrown, if the plan is invalid or the optimizer encountered an inconsistent
-	 *         situation during the compilation process.
-	 */
-	private OptimizedPlan compile(Plan program, OptimizerPostPass postPasser) throws CompilerException {
-		if (program == null || postPasser == null) {
-			throw new NullPointerException();
-		}
-		
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("Beginning compilation of program '" + program.getJobName() + '\'');
-		}
-
-		final ExecutionMode defaultDataExchangeMode = program.getExecutionConfig().getExecutionMode();
-
-		final int defaultParallelism = program.getDefaultParallelism() > 0 ?
-			program.getDefaultParallelism() : this.defaultDegreeOfParallelism;
-
-		// log the default settings
-		LOG.debug("Using a default parallelism of {}",  defaultParallelism);
-		LOG.debug("Using default data exchange mode {}", defaultDataExchangeMode);
-
-		// the first step in the compilation is to create the optimizer plan representation
-		// this step does the following:
-		// 1) It creates an optimizer plan node for each operator
-		// 2) It connects them via channels
-		// 3) It looks for hints about local strategies and channel types and
-		// sets the types and strategies accordingly
-		// 4) It makes estimates about the data volume of the data sources and
-		// propagates those estimates through the plan
-
-		GraphCreatingVisitor graphCreator = new GraphCreatingVisitor(defaultParallelism, defaultDataExchangeMode);
-		program.accept(graphCreator);
-
-		// if we have a plan with multiple data sinks, add logical optimizer nodes that have two data-sinks as children
-		// each until we have only a single root node. This allows to transparently deal with the nodes with
-		// multiple outputs
-		OptimizerNode rootNode;
-		if (graphCreator.sinks.size() == 1) {
-			rootNode = graphCreator.sinks.get(0);
-		} else if (graphCreator.sinks.size() > 1) {
-			Iterator<DataSinkNode> iter = graphCreator.sinks.iterator();
-			rootNode = iter.next();
-
-			while (iter.hasNext()) {
-				rootNode = new SinkJoiner(rootNode, iter.next());
-			}
-		} else {
-			throw new CompilerException("Bug: The optimizer plan representation has no sinks.");
-		}
-
-		// now that we have all nodes created and recorded which ones consume memory, tell the nodes their minimal
-		// guaranteed memory, for further cost estimations. we assume an equal distribution of memory among consumer tasks
-		rootNode.accept(new IdAndEstimatesVisitor(this.statistics));
-
-		// We are dealing with operator DAGs, rather than operator trees.
-		// That requires us to deviate at some points from the classical DB optimizer algorithms.
-		// This step build some auxiliary structures to help track branches and joins in the DAG
-		BranchesVisitor branchingVisitor = new BranchesVisitor();
-		rootNode.accept(branchingVisitor);
-
-		// Propagate the interesting properties top-down through the graph
-		InterestingPropertyVisitor propsVisitor = new InterestingPropertyVisitor(this.costEstimator);
-		rootNode.accept(propsVisitor);
-		
-		// perform a sanity check: the root may not have any unclosed branches
-		if (rootNode.getOpenBranches() != null && rootNode.getOpenBranches().size() > 0) {
-			throw new CompilerException("Bug: Logic for branching plans (non-tree plans) has an error, and does not " +
-					"track the re-joining of branches correctly.");
-		}
-
-		// the final step is now to generate the actual plan alternatives
-		List<PlanNode> bestPlan = rootNode.getAlternativePlans(this.costEstimator);
-
-		if (bestPlan.size() != 1) {
-			throw new CompilerException("Error in compiler: more than one best plan was created!");
-		}
-
-		// check if the best plan's root is a data sink (single sink plan)
-		// if so, directly take it. if it is a sink joiner node, get its contained sinks
-		PlanNode bestPlanRoot = bestPlan.get(0);
-		List<SinkPlanNode> bestPlanSinks = new ArrayList<SinkPlanNode>(4);
-
-		if (bestPlanRoot instanceof SinkPlanNode) {
-			bestPlanSinks.add((SinkPlanNode) bestPlanRoot);
-		} else if (bestPlanRoot instanceof SinkJoinerPlanNode) {
-			((SinkJoinerPlanNode) bestPlanRoot).getDataSinks(bestPlanSinks);
-		}
-		
-		DeadlockPreventer dp = new DeadlockPreventer();
-		dp.resolveDeadlocks(bestPlanSinks);
-
-		// finalize the plan
-		OptimizedPlan plan = new PlanFinalizer().createFinalPlan(bestPlanSinks, program.getJobName(), program);
-		
-		plan.accept(new BinaryUnionReplacer());
-		
-		// post pass the plan. this is the phase where the serialization and comparator code is set
-		postPasser.postPass(plan);
-		
-		return plan;
-	}
-
-	/**
-	 * This function performs only the first step to the compilation process - the creation of the optimizer
-	 * representation of the plan. No estimations or enumerations of alternatives are done here.
-	 * 
-	 * @param program The plan to generate the optimizer representation for.
-	 * @return The optimizer representation of the plan, as a collection of all data sinks
-	 *         from the plan can be traversed.
-	 */
-	public static List<DataSinkNode> createPreOptimizedPlan(Plan program) {
-		GraphCreatingVisitor graphCreator = new GraphCreatingVisitor(1, null);
-		program.accept(graphCreator);
-		return graphCreator.sinks;
-	}
-	
-	// ------------------------------------------------------------------------
-	//                 Visitors for Compilation Traversals
-	// ------------------------------------------------------------------------
-	
-	/**
-	 * This utility class performs the translation from the user specified program to the optimizer plan.
-	 * It works as a visitor that walks the user's job in a depth-first fashion. During the descend, it creates
-	 * an optimizer node for each operator, respectively data source or -sink. During the ascend, it connects
-	 * the nodes to the full graph.
-	 * <p>
-	 * This translator relies on the <code>setInputs</code> method in the nodes. As that method implements the size
-	 * estimation and the awareness for optimizer hints, the sizes will be properly estimated and the translated plan
-	 * already respects all optimizer hints.
-	 */
-	public static final class GraphCreatingVisitor implements Visitor<Operator<?>> {
-		
-		private final Map<Operator<?>, OptimizerNode> con2node; // map from the operator objects to their
-																// corresponding optimizer nodes
-
-		private final List<DataSinkNode> sinks; // all data sink nodes in the optimizer plan
-
-		private final int defaultParallelism; // the default degree of parallelism
-		
-		private final GraphCreatingVisitor parent;	// reference to enclosing creator, in case of a recursive translation
-
-		private final ExecutionMode defaultDataExchangeMode;
-
-		private final boolean forceDOP;
-
-		
-		public GraphCreatingVisitor(int defaultParallelism, ExecutionMode defaultDataExchangeMode) {
-			this(null, false, defaultParallelism, defaultDataExchangeMode, null);
-		}
-
-		private GraphCreatingVisitor(GraphCreatingVisitor parent, boolean forceDOP, int defaultParallelism,
-									ExecutionMode dataExchangeMode, HashMap<Operator<?>, OptimizerNode> closure) {
-			if (closure == null){
-				con2node = new HashMap<Operator<?>, OptimizerNode>();
-			} else {
-				con2node = closure;
-			}
-
-			this.sinks = new ArrayList<DataSinkNode>(2);
-			this.defaultParallelism = defaultParallelism;
-			this.parent = parent;
-			this.defaultDataExchangeMode = dataExchangeMode;
-			this.forceDOP = forceDOP;
-		}
-
-		public List<DataSinkNode> getSinks() {
-			return sinks;
-		}
-
-		@SuppressWarnings("deprecation")
-		@Override
-		public boolean preVisit(Operator<?> c) {
-			// check if we have been here before
-			if (this.con2node.containsKey(c)) {
-				return false;
-			}
-
-			final OptimizerNode n;
-
-			// create a node for the operator (or sink or source) if we have not been here before
-			if (c instanceof GenericDataSinkBase) {
-				DataSinkNode dsn = new DataSinkNode((GenericDataSinkBase<?>) c);
-				this.sinks.add(dsn);
-				n = dsn;
-			}
-			else if (c instanceof GenericDataSourceBase) {
-				n = new DataSourceNode((GenericDataSourceBase<?, ?>) c);
-			}
-			else if (c instanceof MapOperatorBase) {
-				n = new MapNode((MapOperatorBase<?, ?, ?>) c);
-			}
-			else if (c instanceof MapPartitionOperatorBase) {
-				n = new MapPartitionNode((MapPartitionOperatorBase<?, ?, ?>) c);
-			}
-			else if (c instanceof org.apache.flink.api.common.operators.base.CollectorMapOperatorBase) {
-				n = new CollectorMapNode((org.apache.flink.api.common.operators.base.CollectorMapOperatorBase<?, ?, ?>) c);
-			}
-			else if (c instanceof FlatMapOperatorBase) {
-				n = new FlatMapNode((FlatMapOperatorBase<?, ?, ?>) c);
-			}
-			else if (c instanceof FilterOperatorBase) {
-				n = new FilterNode((FilterOperatorBase<?, ?>) c);
-			}
-			else if (c instanceof ReduceOperatorBase) {
-				n = new ReduceNode((ReduceOperatorBase<?, ?>) c);
-			}
-			else if (c instanceof GroupReduceOperatorBase) {
-				n = new GroupReduceNode((GroupReduceOperatorBase<?, ?, ?>) c);
-			}
-			else if (c instanceof GroupCombineOperatorBase) {
-				n = new GroupCombineNode((GroupCombineOperatorBase<?, ?, ?>) c);
-			}
-			else if (c instanceof JoinOperatorBase) {
-				n = new JoinNode((JoinOperatorBase<?, ?, ?, ?>) c);
-			}
-			else if (c instanceof CoGroupOperatorBase) {
-				n = new CoGroupNode((CoGroupOperatorBase<?, ?, ?, ?>) c);
-			}
-			else if (c instanceof CrossOperatorBase) {
-				n = new CrossNode((CrossOperatorBase<?, ?, ?, ?>) c);
-			}
-			else if (c instanceof BulkIterationBase) {
-				n = new BulkIterationNode((BulkIterationBase<?>) c);
-			}
-			else if (c instanceof DeltaIterationBase) {
-				n = new WorksetIterationNode((DeltaIterationBase<?, ?>) c);
-			}
-			else if (c instanceof Union){
-				n = new BinaryUnionNode((Union<?>) c);
-			}
-			else if (c instanceof PartitionOperatorBase) {
-				n = new PartitionNode((PartitionOperatorBase<?>) c);
-			}
-			else if (c instanceof SortPartitionOperatorBase) {
-				n = new SortPartitionNode((SortPartitionOperatorBase<?>) c);
-			}
-			else if (c instanceof PartialSolutionPlaceHolder) {
-				if (this.parent == null) {
-					throw new InvalidProgramException("It is currently not supported to create data sinks inside iterations.");
-				}
-				
-				final PartialSolutionPlaceHolder<?> holder = (PartialSolutionPlaceHolder<?>) c;
-				final BulkIterationBase<?> enclosingIteration = holder.getContainingBulkIteration();
-				final BulkIterationNode containingIterationNode =
-							(BulkIterationNode) this.parent.con2node.get(enclosingIteration);
-				
-				// catch this for the recursive translation of step functions
-				BulkPartialSolutionNode p = new BulkPartialSolutionNode(holder, containingIterationNode);
-				p.setDegreeOfParallelism(containingIterationNode.getDegreeOfParallelism());
-				n = p;
-			}
-			else if (c instanceof WorksetPlaceHolder) {
-				if (this.parent == null) {
-					throw new InvalidProgramException("It is currently not supported to create data sinks inside iterations.");
-				}
-				
-				final WorksetPlaceHolder<?> holder = (WorksetPlaceHolder<?>) c;
-				final DeltaIterationBase<?, ?> enclosingIteration = holder.getContainingWorksetIteration();
-				final WorksetIterationNode containingIterationNode =
-							(WorksetIterationNode) this.parent.con2node.get(enclosingIteration);
-				
-				// catch this for the recursive translation of step functions
-				WorksetNode p = new WorksetNode(holder, containingIterationNode);
-				p.setDegreeOfParallelism(containingIterationNode.getDegreeOfParallelism());
-				n = p;
-			}
-			else if (c instanceof SolutionSetPlaceHolder) {
-				if (this.parent == null) {
-					throw new InvalidProgramException("It is currently not supported to create data sinks inside iterations.");
-				}
-				
-				final SolutionSetPlaceHolder<?> holder = (SolutionSetPlaceHolder<?>) c;
-				final DeltaIterationBase<?, ?> enclosingIteration = holder.getContainingWorksetIteration();
-				final WorksetIterationNode containingIterationNode =
-							(WorksetIterationNode) this.parent.con2node.get(enclosingIteration);
-				
-				// catch this for the recursive translation of step functions
-				SolutionSetNode p = new SolutionSetNode(holder, containingIterationNode);
-				p.setDegreeOfParallelism(containingIterationNode.getDegreeOfParallelism());
-				n = p;
-			}
-			else {
-				throw new IllegalArgumentException("Unknown operator type: " + c);
-			}
-
-			this.con2node.put(c, n);
-			
-			// set the parallelism only if it has not been set before. some nodes have a fixed DOP, such as the
-			// key-less reducer (all-reduce)
-			if (n.getDegreeOfParallelism() < 1) {
-				// set the degree of parallelism
-				int par = c.getDegreeOfParallelism();
-				if (par > 0) {
-					if (this.forceDOP && par != this.defaultParallelism) {
-						par = this.defaultParallelism;
-						LOG.warn("The parallelism of nested dataflows (such as step functions in iterations) is " +
-							"currently fixed to the parallelism of the surrounding operator (the iteration).");
-					}
-				} else {
-					par = this.defaultParallelism;
-				}
-				n.setDegreeOfParallelism(par);
-			}
-
-			return true;
-		}
-
-		@Override
-		public void postVisit(Operator<?> c) {
-			
-			OptimizerNode n = this.con2node.get(c);
-
-			// first connect to the predecessors
-			n.setInput(this.con2node, this.defaultDataExchangeMode);
-			n.setBroadcastInputs(this.con2node, this.defaultDataExchangeMode);
-			
-			// if the node represents a bulk iteration, we recursively translate the data flow now
-			if (n instanceof BulkIterationNode) {
-				final BulkIterationNode iterNode = (BulkIterationNode) n;
-				final BulkIterationBase<?> iter = iterNode.getIterationContract();
-
-				// pass a copy of the no iterative part into the iteration translation,
-				// in case the iteration references its closure
-				HashMap<Operator<?>, OptimizerNode> closure = new HashMap<Operator<?>, OptimizerNode>(con2node);
-
-				// first, recursively build the data flow for the step function
-				final GraphCreatingVisitor recursiveCreator = new GraphCreatingVisitor(this, true,
-					iterNode.getDegreeOfParallelism(), defaultDataExchangeMode, closure);
-				
-				BulkPartialSolutionNode partialSolution;
-				
-				iter.getNextPartialSolution().accept(recursiveCreator);
-				
-				partialSolution =  (BulkPartialSolutionNode) recursiveCreator.con2node.get(iter.getPartialSolution());
-				OptimizerNode rootOfStepFunction = recursiveCreator.con2node.get(iter.getNextPartialSolution());
-				if (partialSolution == null) {
-					throw new CompilerException("Error: The step functions result does not depend on the partial solution.");
-				}
-				
-				
-				OptimizerNode terminationCriterion = null;
-				
-				if (iter.getTerminationCriterion() != null) {
-					terminationCriterion = recursiveCreator.con2node.get(iter.getTerminationCriterion());
-					
-					// no intermediate node yet, traverse from the termination criterion to build the missing parts
-					if (terminationCriterion == null) {
-						iter.getTerminationCriterion().accept(recursiveCreator);
-						terminationCriterion = recursiveCreator.con2node.get(iter.getTerminationCriterion());
-					}
-				}
-				
-				iterNode.setPartialSolution(partialSolution);
-				iterNode.setNextPartialSolution(rootOfStepFunction, terminationCriterion);
-				
-				// go over the contained data flow and mark the dynamic path nodes
-				StaticDynamicPathIdentifier identifier = new StaticDynamicPathIdentifier(iterNode.getCostWeight());
-				iterNode.acceptForStepFunction(identifier);
-			}
-			else if (n instanceof WorksetIterationNode) {
-				final WorksetIterationNode iterNode = (WorksetIterationNode) n;
-				final DeltaIterationBase<?, ?> iter = iterNode.getIterationContract();
-
-				// we need to ensure that both the next-workset and the solution-set-delta depend on the workset.
-				// One check is for free during the translation, we do the other check here as a pre-condition
-				{
-					StepFunctionValidator wsf = new StepFunctionValidator();
-					iter.getNextWorkset().accept(wsf);
-					if (!wsf.foundWorkset) {
-						throw new CompilerException("In the given program, the next workset does not depend on the workset. " +
-															"This is a prerequisite in delta iterations.");
-					}
-				}
-				
-				// calculate the closure of the anonymous function
-				HashMap<Operator<?>, OptimizerNode> closure = new HashMap<Operator<?>, OptimizerNode>(con2node);
-
-				// first, recursively build the data flow for the step function
-				final GraphCreatingVisitor recursiveCreator = new GraphCreatingVisitor(
-						this, true, iterNode.getDegreeOfParallelism(), defaultDataExchangeMode, closure);
-				
-				// descend from the solution set delta. check that it depends on both the workset
-				// and the solution set. If it does depend on both, this descend should create both nodes
-				iter.getSolutionSetDelta().accept(recursiveCreator);
-				
-				final WorksetNode worksetNode = (WorksetNode) recursiveCreator.con2node.get(iter.getWorkset());
-				
-				if (worksetNode == null) {
-					throw new CompilerException("In the given program, the solution set delta does not depend on the workset." +
-														"This is a prerequisite in delta iterations.");
-				}
-				
-				iter.getNextWorkset().accept(recursiveCreator);
-				
-				SolutionSetNode solutionSetNode = (SolutionSetNode) recursiveCreator.con2node.get(iter.getSolutionSet());
-				
-				if (solutionSetNode == null || solutionSetNode.getOutgoingConnections() == null || solutionSetNode.getOutgoingConnections().isEmpty()) {
-					solutionSetNode = new SolutionSetNode((SolutionSetPlaceHolder<?>) iter.getSolutionSet(), iterNode);
-				}
-				else {
-					for (PactConnection conn : solutionSetNode.getOutgoingConnections()) {
-						OptimizerNode successor = conn.getTarget();
-					
-						if (successor.getClass() == JoinNode.class) {
-							// find out which input to the match the solution set is
-							JoinNode mn = (JoinNode) successor;
-							if (mn.getFirstPredecessorNode() == solutionSetNode) {
-								mn.makeJoinWithSolutionSet(0);
-							} else if (mn.getSecondPredecessorNode() == solutionSetNode) {
-								mn.makeJoinWithSolutionSet(1);
-							} else {
-								throw new CompilerException();
-							}
-						}
-						else if (successor.getClass() == CoGroupNode.class) {
-							CoGroupNode cg = (CoGroupNode) successor;
-							if (cg.getFirstPredecessorNode() == solutionSetNode) {
-								cg.makeCoGroupWithSolutionSet(0);
-							} else if (cg.getSecondPredecessorNode() == solutionSetNode) {
-								cg.makeCoGroupWithSolutionSet(1);
-							} else {
-								throw new CompilerException();
-							}
-						}
-						else {
-							throw new InvalidProgramException(
-									"Error: The only operations allowed on the solution set are Join and CoGroup.");
-						}
-					}
-				}
-				
-				final OptimizerNode nextWorksetNode = recursiveCreator.con2node.get(iter.getNextWorkset());
-				final OptimizerNode solutionSetDeltaNode = recursiveCreator.con2node.get(iter.getSolutionSetDelta());
-				
-				// set the step function nodes to the iteration node
-				iterNode.setPartialSolution(solutionSetNode, worksetNode);
-				iterNode.setNextPartialSolution(solutionSetDeltaNode, nextWorksetNode, defaultDataExchangeMode);
-				
-				// go over the contained data flow and mark the dynamic path nodes
-				StaticDynamicPathIdentifier pathIdentifier = new StaticDynamicPathIdentifier(iterNode.getCostWeight());
-				iterNode.acceptForStepFunction(pathIdentifier);
-			}
-		}
-	}
-	
-	private static final class StaticDynamicPathIdentifier implements Visitor<OptimizerNode> {
-		
-		private final Set<OptimizerNode> seenBefore = new HashSet<OptimizerNode>();
-		
-		private final int costWeight;
-		
-		private StaticDynamicPathIdentifier(int costWeight) {
-			this.costWeight = costWeight;
-		}
-		
-		@Override
-		public boolean preVisit(OptimizerNode visitable) {
-			return this.seenBefore.add(visitable);
-		}
-
-		@Override
-		public void postVisit(OptimizerNode visitable) {
-			visitable.identifyDynamicPath(this.costWeight);
-			
-			// check that there is no nested iteration on the dynamic path
-			if (visitable.isOnDynamicPath() && visitable instanceof IterationNode) {
-				throw new CompilerException("Nested iterations are currently not supported.");
-			}
-		}
-	}
-	
-	/**
-	 * Simple visitor that sets the minimal guaranteed memory per task based on the amount of available memory,
-	 * the number of memory consumers, and on the task's degree of parallelism.
-	 */
-	public static final class IdAndEstimatesVisitor implements Visitor<OptimizerNode> {
-		
-		private final DataStatistics statistics;
-
-		private int id = 1;
-		
-		public IdAndEstimatesVisitor(DataStatistics statistics) {
-			this.statistics = statistics;
-		}
-
-		@Override
-		public boolean preVisit(OptimizerNode visitable) {
-			return visitable.getId() == -1;
-		}
-
-		@Override
-		public void postVisit(OptimizerNode visitable) {
-			// the node ids
-			visitable.initId(this.id++);
-			
-			// connections need to figure out their maximum path depths
-			for (PactConnection conn : visitable.getIncomingConnections()) {
-				conn.initMaxDepth();
-			}
-			for (PactConnection conn : visitable.getBroadcastConnections()) {
-				conn.initMaxDepth();
-			}
-			
-			// the estimates
-			visitable.computeOutputEstimates(this.statistics);
-			
-			// if required, recurse into the step function
-			if (visitable instanceof IterationNode) {
-				((IterationNode) visitable).acceptForStepFunction(this);
-			}
-		}
-	}
-	
-	/**
-	 * Visitor that computes the interesting properties for each node in the plan. On its recursive
-	 * depth-first descend, it propagates all interesting properties top-down.
-	 */
-	public static final class InterestingPropertyVisitor implements Visitor<OptimizerNode> {
-		
-		private CostEstimator estimator; // the cost estimator for maximal costs of an interesting property
-
-		/**
-		 * Creates a new visitor that computes the interesting properties for all nodes in the plan.
-		 * It uses the given cost estimator used to compute the maximal costs for an interesting property.
-		 * 
-		 * @param estimator
-		 *        The cost estimator to estimate the maximal costs for interesting properties.
-		 */
-		public InterestingPropertyVisitor(CostEstimator estimator) {
-			this.estimator = estimator;
-		}
-		
-		@Override
-		public boolean preVisit(OptimizerNode node) {
-			// The interesting properties must be computed on the descend. In case a node has multiple outputs,
-			// that computation must happen during the last descend.
-
-			if (node.getInterestingProperties() == null && node.haveAllOutputConnectionInterestingProperties()) {
-				node.computeUnionOfInterestingPropertiesFromSuccessors();
-				node.computeInterestingPropertiesForInputs(this.estimator);
-				return true;
-			} else {
-				return false;
-			}
-		}
-
-		@Override
-		public void postVisit(OptimizerNode visitable) {}
-	}
-
-	/**
-	 * On its re-ascend (post visit) this visitor, computes auxiliary maps that are needed to support plans
-	 * that are not a minimally connected DAG (Such plans are not trees, but at least one node feeds its
-	 * output into more than one other node).
-	 */
-	public static final class BranchesVisitor implements Visitor<OptimizerNode> {
-		
-		@Override
-		public boolean preVisit(OptimizerNode node) {
-			return node.getOpenBranches() == null;
-		}
-
-		@Override
-		public void postVisit(OptimizerNode node) {
-			if (node instanceof IterationNode) {
-				((IterationNode) node).acceptForStepFunction(this);
-			}
-
-			node.computeUnclosedBranchStack();
-		}
-	}
-	
-	/**
-	 * Finalization of the plan:
-	 *  - The graph of nodes is double-linked (links from child to parent are inserted)
-	 *  - If unions join static and dynamic paths, the cache is marked as a memory consumer
-	 *  - Relative memory fractions are assigned to all nodes.
-	 *  - All nodes are collected into a set.
-	 */
-	private static final class PlanFinalizer implements Visitor<PlanNode> {
-		
-		private final Set<PlanNode> allNodes; // a set of all nodes in the optimizer plan
-
-		private final List<SourcePlanNode> sources; // all data source nodes in the optimizer plan
-
-		private final List<SinkPlanNode> sinks; // all data sink nodes in the optimizer plan
-		
-		private final Deque<IterationPlanNode> stackOfIterationNodes;
-
-		private int memoryConsumerWeights; // a counter of all memory consumers
-
-		/**
-		 * Creates a new plan finalizer.
-		 */
-		private PlanFinalizer() {
-			this.allNodes = new HashSet<PlanNode>();
-			this.sources = new ArrayList<SourcePlanNode>();
-			this.sinks = new ArrayList<SinkPlanNode>();
-			this.stackOfIterationNodes = new ArrayDeque<IterationPlanNode>();
-		}
-
-		private OptimizedPlan createFinalPlan(List<SinkPlanNode> sinks, String jobName, Plan originalPlan) {
-			this.memoryConsumerWeights = 0;
-			
-			// traverse the graph
-			for (SinkPlanNode node : sinks) {
-				node.accept(this);
-			}
-
-			// assign the memory to each node
-			if (this.memoryConsumerWeights > 0) {
-				for (PlanNode node : this.allNodes) {
-					// assign memory to the driver strategy of the node
-					final int consumerWeight = node.getMemoryConsumerWeight();
-					if (consumerWeight > 0) {
-						final double relativeMem = (double)consumerWeight / this.memoryConsumerWeights;
-						node.setRelativeMemoryPerSubtask(relativeMem);
-						if (LOG.isDebugEnabled()) {
-							LOG.debug("Assigned " + relativeMem + " of total memory to each subtask of " +
-								node.getPactContract().getName() + ".");
-						}
-					}
-					
-					// assign memory to the local and global strategies of the channels
-					for (Channel c : node.getInputs()) {
-						if (c.getLocalStrategy().dams()) {
-							final double relativeMem = 1.0 / this.memoryConsumerWeights;
-							c.setRelativeMemoryLocalStrategy(relativeMem);
-							if (LOG.isDebugEnabled()) {
-								LOG.debug("Assigned " + relativeMem + " of total memory to each local strategy " +
-										"instance of " + c + ".");
-							}
-						}
-						if (c.getTempMode() != TempMode.NONE) {
-							final double relativeMem = 1.0/ this.memoryConsumerWeights;
-							c.setRelativeTempMemory(relativeMem);
-							if (LOG.isDebugEnabled()) {
-								LOG.debug("Assigned " + relativeMem + " of total memory to each instance of the temp " +
-										"table for " + c + ".");
-							}
-						}
-					}
-				}
-			}
-			return new OptimizedPlan(this.sources, this.sinks, this.allNodes, jobName, originalPlan);
-		}
-
-		@Override
-		public boolean preVisit(PlanNode visitable) {
-			// if we come here again, prevent a further descend
-			if (!this.allNodes.add(visitable)) {
-				return false;
-			}
-			
-			if (visitable instanceof SinkPlanNode) {
-				this.sinks.add((SinkPlanNode) visitable);
-			}
-			else if (visitable instanceof SourcePlanNode) {
-				this.sources.add((SourcePlanNode) visitable);
-			}
-			else if (visitable instanceof BinaryUnionPlanNode) {
-				BinaryUnionPlanNode unionNode = (BinaryUnionPlanNode) visitable;
-				if (unionNode.unionsStaticAndDynamicPath()) {
-					unionNode.setDriverStrategy(DriverStrategy.UNION_WITH_CACHED);
-				}
-			}
-			else if (visitable instanceof BulkPartialSolutionPlanNode) {
-				// tell the partial solution about the iteration node that contains it
-				final BulkPartialSolutionPlanNode pspn = (BulkPartialSolutionPlanNode) visitable;
-				final IterationPlanNode iteration = this.stackOfIterationNodes.peekLast();
-				
-				// sanity check!
-				if (iteration == null || !(iteration instanceof BulkIterationPlanNode)) {
-					throw new CompilerException("Bug: Error finalizing the plan. " +
-							"Cannot associate the node for a partial solutions with its containing iteration.");
-				}
-				pspn.setContainingIterationNode((BulkIterationPlanNode) iteration);
-			}
-			else if (visitable instanceof WorksetPlanNode) {
-				// tell the partial solution about the iteration node that contains it
-				final WorksetPlanNode wspn = (WorksetPlanNode) visitable;
-				final IterationPlanNode iteration = this.stackOfIterationNodes.peekLast();
-				
-				// sanity check!
-				if (iteration == null || !(iteration instanceof WorksetIterationPlanNode)) {
-					throw new CompilerException("Bug: Error finalizing the plan. " +
-							"Cannot associate the node for a partial solutions with its containing iteration.");
-				}
-				wspn.setContainingIterationNode((WorksetIterationPlanNode) iteration);
-			}
-			else if (visitable instanceof SolutionSetPlanNode) {
-				// tell the partial solution about the iteration node that contains it
-				final SolutionSetPlanNode sspn = (SolutionSetPlanNode) visitable;
-				final IterationPlanNode iteration = this.stackOfIterationNodes.peekLast();
-				
-				// sanity check!
-				if (iteration == null || !(iteration instanceof WorksetIterationPlanNode)) {
-					throw new CompilerException("Bug: Error finalizing the plan. " +
-							"Cannot associate the node for a partial solutions with its containing iteration.");
-				}
-				sspn.setContainingIterationNode((WorksetIterationPlanNode) iteration);
-			}
-			
-			// double-connect the connections. previously, only parents knew their children, because
-			// one child candidate could have been referenced by multiple parents.
-			for (Channel conn : visitable.getInputs()) {
-				conn.setTarget(visitable);
-				conn.getSource().addOutgoingChannel(conn);
-			}
-			
-			for (Channel c : visitable.getBroadcastInputs()) {
-				c.setTarget(visitable);
-				c.getSource().addOutgoingChannel(c);
-			}
-
-			// count the memory consumption
-			this.memoryConsumerWeights += visitable.getMemoryConsumerWeight();
-			for (Channel c : visitable.getInputs()) {
-				if (c.getLocalStrategy().dams()) {
-					this.memoryConsumerWeights++;
-				}
-				if (c.getTempMode() != TempMode.NONE) {
-					this.memoryConsumerWeights++;
-				}
-			}
-			for (Channel c : visitable.getBroadcastInputs()) {
-				if (c.getLocalStrategy().dams()) {
-					this.memoryConsumerWeights++;
-				}
-				if (c.getTempMode() != TempMode.NONE) {
-					this.memoryConsumerWeights++;
-				}
-			}
-			
-			// pass the visitor to the iteraton's step function
-			if (visitable instanceof IterationPlanNode) {
-				// push the iteration node onto the stack
-				final IterationPlanNode iterNode = (IterationPlanNode) visitable;
-				this.stackOfIterationNodes.addLast(iterNode);
-				
-				// recurse
-				((IterationPlanNode) visitable).acceptForStepFunction(this);
-				
-				// pop the iteration node from the stack
-				this.stackOfIterationNodes.removeLast();
-			}
-			return true;
-		}
-
-		@Override
-		public void postVisit(PlanNode visitable) {}
-	}
-	
-	/**
-	 * A visitor that traverses the graph and collects cascading binary unions into a single n-ary
-	 * union operator. The exception is, when on of the union inputs is materialized, such as in the
-	 * static-code-path-cache in iterations.
-	 */
-	private static final class BinaryUnionReplacer implements Visitor<PlanNode> {
-		
-		private final Set<PlanNode> seenBefore = new HashSet<PlanNode>();
-
-		@Override
-		public boolean preVisit(PlanNode visitable) {
-			if (this.seenBefore.add(visitable)) {
-				if (visitable instanceof IterationPlanNode) {
-					((IterationPlanNode) visitable).acceptForStepFunction(this);
-				}
-				return true;
-			} else {
-				return false;
-			}
-		}
-
-		@Override
-		public void postVisit(PlanNode visitable) {
-			
-			if (visitable instanceof BinaryUnionPlanNode) {
-				
-				final BinaryUnionPlanNode unionNode = (BinaryUnionPlanNode) visitable;
-				final Channel in1 = unionNode.getInput1();
-				final Channel in2 = unionNode.getInput2();
-			
-				if (!unionNode.unionsStaticAndDynamicPath()) {
-					
-					// both on static path, or both on dynamic path. we can collapse them
-					NAryUnionPlanNode newUnionNode;
-
-					List<Channel> inputs = new ArrayList<Channel>();
-					collect(in1, inputs);
-					collect(in2, inputs);
-
-					newUnionNode = new NAryUnionPlanNode(unionNode.getOptimizerNode(), inputs, 
-							unionNode.getGlobalProperties(), unionNode.getCumulativeCosts());
-					
-					newUnionNode.setDegreeOfParallelism(unionNode.getDegreeOfParallelism());
-
-					for (Channel c : inputs) {
-						c.setTarget(newUnionNode);
-					}
-
-					for (Channel channel : unionNode.getOutgoingChannels()) {
-						channel.swapUnionNodes(newUnionNode);
-						newUnionNode.addOutgoingChannel(channel);
-					}
-				}
-				else {
-					// union between the static and the dynamic path. we need to handle this for now
-					// through a special union operator
-					
-					// make sure that the first input is the cached (static) and the second input is the dynamic
-					if (in1.isOnDynamicPath()) {
-						BinaryUnionPlanNode newUnionNode = new BinaryUnionPlanNode(unionNode);
-						
-						in1.setTarget(newUnionNode);
-						in2.setTarget(newUnionNode);
-						
-						for (Channel channel : unionNode.getOutgoingChannels()) {
-							channel.swapUnionNodes(newUnionNode);
-							newUnionNode.addOutgoingChannel(channel);
-						}
-					}
-				}
-			}
-		}
-		
-		private void collect(Channel in, List<Channel> inputs) {
-			if (in.getSource() instanceof NAryUnionPlanNode) {
-				// sanity check
-				if (in.getShipStrategy() != ShipStrategyType.FORWARD) {
-					throw new CompilerException("Bug: Plan generation for Unions picked a ship strategy between binary plan operators.");
-				}
-				if (!(in.getLocalStrategy() == null || in.getLocalStrategy() == LocalStrategy.NONE)) {
-					throw new CompilerException("Bug: Plan generation for Unions picked a local strategy between binary plan operators.");
-				}
-				
-				inputs.addAll(((NAryUnionPlanNode) in.getSource()).getListOfInputs());
-			} else {
-				// is not a collapsed union node, so we take the channel directly
-				inputs.add(in);
-			}
-		}
-	}
-	
-	private static final class StepFunctionValidator implements Visitor<Operator<?>> {
-
-		private final Set<Operator<?>> seenBefore = new HashSet<Operator<?>>();
-		
-		private boolean foundWorkset;
-		
-		@Override
-		public boolean preVisit(Operator<?> visitable) {
-			if (visitable instanceof WorksetPlaceHolder) {
-				foundWorkset = true;
-			}
-			
-			return (!foundWorkset) && seenBefore.add(visitable);
-		}
-
-		@Override
-		public void postVisit(Operator<?> visitable) {}
-	}
-
-	// ------------------------------------------------------------------------
-	// Miscellaneous
-	// ------------------------------------------------------------------------
-	
-	private OptimizerPostPass getPostPassFromPlan(Plan program) {
-		final String className =  program.getPostPassClassName();
-		if (className == null) {
-			throw new CompilerException("Optimizer Post Pass class description is null");
-		}
-		try {
-			Class<? extends OptimizerPostPass> clazz = Class.forName(className).asSubclass(OptimizerPostPass.class);
-			try {
-				return InstantiationUtil.instantiate(clazz, OptimizerPostPass.class);
-			} catch (RuntimeException rtex) {
-				// unwrap the source exception
-				if (rtex.getCause() != null) {
-					throw new CompilerException("Cannot instantiate optimizer post pass: " + rtex.getMessage(), rtex.getCause());
-				} else {
-					throw rtex;
-				}
-			}
-		} catch (ClassNotFoundException cnfex) {
-			throw new CompilerException("Cannot load Optimizer post-pass class '" + className + "'.", cnfex);
-		} catch (ClassCastException ccex) {
-			throw new CompilerException("Class '" + className + "' is not an optimizer post passer.", ccex);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/AbstractPartialSolutionNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/AbstractPartialSolutionNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/AbstractPartialSolutionNode.java
index f8404c4..d199ae7 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/AbstractPartialSolutionNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/AbstractPartialSolutionNode.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.optimizer.dag;
 
 import java.util.Collections;
@@ -62,8 +61,8 @@ public abstract class AbstractPartialSolutionNode extends OptimizerNode {
 	}
 
 	@Override
-	public List<PactConnection> getIncomingConnections() {
-		return Collections.<PactConnection>emptyList();
+	public List<DagConnection> getIncomingConnections() {
+		return Collections.emptyList();
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java
index 8aed10c..068799e 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java
@@ -107,7 +107,7 @@ public class BinaryUnionNode extends TwoInputNode {
 		final List<? extends PlanNode> subPlans1 = getFirstPredecessorNode().getAlternativePlans(estimator);
 		final List<? extends PlanNode> subPlans2 = getSecondPredecessorNode().getAlternativePlans(estimator);
 
-		List<PactConnection> broadcastConnections = getBroadcastConnections();
+		List<DagConnection> broadcastConnections = getBroadcastConnections();
 		if (broadcastConnections != null && broadcastConnections.size() > 0) {
 			throw new CompilerException("Found BroadcastVariables on a Union operation");
 		}
@@ -122,9 +122,9 @@ public class BinaryUnionNode extends TwoInputNode {
 		final ExecutionMode input1Mode = this.input1.getDataExchangeMode();
 		final ExecutionMode input2Mode = this.input2.getDataExchangeMode();
 
-		final int dop = getDegreeOfParallelism();
-		final int inDop1 = getFirstPredecessorNode().getDegreeOfParallelism();
-		final int inDop2 = getSecondPredecessorNode().getDegreeOfParallelism();
+		final int dop = getParallelism();
+		final int inDop1 = getFirstPredecessorNode().getParallelism();
+		final int inDop2 = getSecondPredecessorNode().getParallelism();
 
 		final boolean dopChange1 = dop != inDop1;
 		final boolean dopChange2 = dop != inDop2;

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BulkIterationNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BulkIterationNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BulkIterationNode.java
index 8112748..c55d17a 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BulkIterationNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BulkIterationNode.java
@@ -31,7 +31,7 @@ import org.apache.flink.api.common.operators.base.BulkIterationBase;
 import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.optimizer.CompilerException;
 import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.PactCompiler.InterestingPropertyVisitor;
+import org.apache.flink.optimizer.Optimizer.InterestingPropertyVisitor;
 import org.apache.flink.optimizer.costs.CostEstimator;
 import org.apache.flink.optimizer.dag.WorksetIterationNode.SingleRootJoiner;
 import org.apache.flink.optimizer.dataproperties.GlobalProperties;
@@ -62,9 +62,9 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode
 	
 	private OptimizerNode nextPartialSolution;
 	
-	private PactConnection rootConnection;		// connection out of the next partial solution
+	private DagConnection rootConnection;		// connection out of the next partial solution
 	
-	private PactConnection terminationCriterionRootConnection;	// connection out of the term. criterion
+	private DagConnection terminationCriterionRootConnection;	// connection out of the term. criterion
 	
 	private OptimizerNode singleRoot;
 	
@@ -93,7 +93,7 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode
 	// --------------------------------------------------------------------------------------------
 	
 	public BulkIterationBase<?> getIterationContract() {
-		return (BulkIterationBase<?>) getPactContract();
+		return (BulkIterationBase<?>) getOperator();
 	}
 	
 	/**
@@ -133,14 +133,14 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode
 		
 		// check if the root of the step function has the same DOP as the iteration
 		// or if the step function has any operator at all
-		if (nextPartialSolution.getDegreeOfParallelism() != getDegreeOfParallelism() ||
+		if (nextPartialSolution.getParallelism() != getParallelism() ||
 			nextPartialSolution == partialSolution || nextPartialSolution instanceof BinaryUnionNode)
 		{
 			// add a no-op to the root to express the re-partitioning
 			NoOpNode noop = new NoOpNode();
-			noop.setDegreeOfParallelism(getDegreeOfParallelism());
+			noop.setDegreeOfParallelism(getParallelism());
 
-			PactConnection noOpConn = new PactConnection(nextPartialSolution, noop, ExecutionMode.PIPELINED);
+			DagConnection noOpConn = new DagConnection(nextPartialSolution, noop, ExecutionMode.PIPELINED);
 			noop.setIncomingConnection(noOpConn);
 			nextPartialSolution.addOutgoingConnection(noOpConn);
 			
@@ -152,13 +152,13 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode
 		
 		if (terminationCriterion == null) {
 			this.singleRoot = nextPartialSolution;
-			this.rootConnection = new PactConnection(nextPartialSolution, ExecutionMode.PIPELINED);
+			this.rootConnection = new DagConnection(nextPartialSolution, ExecutionMode.PIPELINED);
 		}
 		else {
 			// we have a termination criterion
 			SingleRootJoiner singleRootJoiner = new SingleRootJoiner();
-			this.rootConnection = new PactConnection(nextPartialSolution, singleRootJoiner, ExecutionMode.PIPELINED);
-			this.terminationCriterionRootConnection = new PactConnection(terminationCriterion, singleRootJoiner,
+			this.rootConnection = new DagConnection(nextPartialSolution, singleRootJoiner, ExecutionMode.PIPELINED);
+			this.terminationCriterionRootConnection = new DagConnection(terminationCriterion, singleRootJoiner,
 																		ExecutionMode.PIPELINED);
 
 			singleRootJoiner.setInputs(this.rootConnection, this.terminationCriterionRootConnection);
@@ -323,7 +323,7 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode
 					locPropsReq.parameterizeChannel(toNoOp);
 					
 					UnaryOperatorNode rebuildPropertiesNode = new UnaryOperatorNode("Rebuild Partial Solution Properties", FieldList.EMPTY_LIST);
-					rebuildPropertiesNode.setDegreeOfParallelism(candidate.getDegreeOfParallelism());
+					rebuildPropertiesNode.setDegreeOfParallelism(candidate.getParallelism());
 					
 					SingleInputPlanNode rebuildPropertiesPlanNode = new SingleInputPlanNode(rebuildPropertiesNode, "Rebuild Partial Solution Properties", toNoOp, DriverStrategy.UNARY_NO_OP);
 					rebuildPropertiesPlanNode.initProperties(toNoOp.getGlobalProperties(), toNoOp.getLocalProperties());
@@ -352,7 +352,7 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode
 		// 5) Create a candidate for the Iteration Node for every remaining plan of the step function.
 		if (terminationCriterion == null) {
 			for (PlanNode candidate : candidates) {
-				BulkIterationPlanNode node = new BulkIterationPlanNode(this, "BulkIteration ("+this.getPactContract().getName()+")", in, pspn, candidate);
+				BulkIterationPlanNode node = new BulkIterationPlanNode(this, "BulkIteration ("+this.getOperator().getName()+")", in, pspn, candidate);
 				GlobalProperties gProps = candidate.getGlobalProperties().clone();
 				LocalProperties lProps = candidate.getLocalProperties().clone();
 				node.initProperties(gProps, lProps);
@@ -367,7 +367,7 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode
 			for (PlanNode candidate : candidates) {
 				for (PlanNode terminationCandidate : terminationCriterionCandidates) {
 					if (singleRoot.areBranchCompatible(candidate, terminationCandidate)) {
-						BulkIterationPlanNode node = new BulkIterationPlanNode(this, "BulkIteration ("+this.getPactContract().getName()+")", in, pspn, candidate, terminationCandidate);
+						BulkIterationPlanNode node = new BulkIterationPlanNode(this, "BulkIteration ("+this.getOperator().getName()+")", in, pspn, candidate, terminationCandidate);
 						GlobalProperties gProps = candidate.getGlobalProperties().clone();
 						LocalProperties lProps = candidate.getLocalProperties().clone();
 						node.initProperties(gProps, lProps);

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BulkPartialSolutionNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BulkPartialSolutionNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BulkPartialSolutionNode.java
index a6e03ff..25a7eef 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BulkPartialSolutionNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BulkPartialSolutionNode.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.optimizer.dag;
 
 import java.util.Collections;
@@ -49,7 +48,8 @@ public class BulkPartialSolutionNode extends AbstractPartialSolutionNode {
 		if (this.cachedPlans != null) {
 			throw new IllegalStateException();
 		} else {
-			this.cachedPlans = Collections.<PlanNode>singletonList(new BulkPartialSolutionPlanNode(this, "PartialSolution ("+this.getPactContract().getName()+")", gProps, lProps, initialInput));
+			this.cachedPlans = Collections.<PlanNode>singletonList(new BulkPartialSolutionPlanNode(this,
+					"PartialSolution ("+this.getOperator().getName()+")", gProps, lProps, initialInput));
 		}
 	}
 	
@@ -73,13 +73,14 @@ public class BulkPartialSolutionNode extends AbstractPartialSolutionNode {
 	// --------------------------------------------------------------------------------------------
 
 	/**
-	 * Gets the contract object for this data source node.
+	 * Gets the operator (here the {@link PartialSolutionPlaceHolder}) that is represented by this
+	 * optimizer node.
 	 * 
-	 * @return The contract.
+	 * @return The operator represented by this optimizer node.
 	 */
 	@Override
-	public PartialSolutionPlaceHolder<?> getPactContract() {
-		return (PartialSolutionPlaceHolder<?>) super.getPactContract();
+	public PartialSolutionPlaceHolder<?> getOperator() {
+		return (PartialSolutionPlaceHolder<?>) super.getOperator();
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/CoGroupNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/CoGroupNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/CoGroupNode.java
index 7c0cc9a..92076c3 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/CoGroupNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/CoGroupNode.java
@@ -50,8 +50,8 @@ public class CoGroupNode extends TwoInputNode {
 	 * @return The CoGroup operator.
 	 */
 	@Override
-	public CoGroupOperatorBase<?, ?, ?, ?> getPactContract() {
-		return (CoGroupOperatorBase<?, ?, ?, ?>) super.getPactContract();
+	public CoGroupOperatorBase<?, ?, ?, ?> getOperator() {
+		return (CoGroupOperatorBase<?, ?, ?, ?>) super.getOperator();
 	}
 
 	@Override
@@ -85,7 +85,7 @@ public class CoGroupNode extends TwoInputNode {
 		Ordering groupOrder1 = null;
 		Ordering groupOrder2 = null;
 		
-		CoGroupOperatorBase<?, ?, ?, ?> cgc = getPactContract();
+		CoGroupOperatorBase<?, ?, ?, ?> cgc = getOperator();
 		groupOrder1 = cgc.getGroupOrderForInputOne();
 		groupOrder2 = cgc.getGroupOrderForInputTwo();
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/CrossNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/CrossNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/CrossNode.java
index afeed1d..8de67e8 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/CrossNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/CrossNode.java
@@ -26,7 +26,7 @@ import org.apache.flink.api.common.operators.base.CrossOperatorBase;
 import org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossHint;
 import org.apache.flink.optimizer.CompilerException;
 import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.PactCompiler;
+import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.operators.CrossBlockOuterFirstDescriptor;
 import org.apache.flink.optimizer.operators.CrossBlockOuterSecondDescriptor;
 import org.apache.flink.optimizer.operators.CrossStreamOuterFirstDescriptor;
@@ -50,7 +50,7 @@ public class CrossNode extends TwoInputNode {
 		super(operation);
 		
 		Configuration conf = operation.getParameters();
-		String localStrategy = conf.getString(PactCompiler.HINT_LOCAL_STRATEGY, null);
+		String localStrategy = conf.getString(Optimizer.HINT_LOCAL_STRATEGY, null);
 	
 		CrossHint hint = operation.getCrossHint();
 		
@@ -60,13 +60,13 @@ public class CrossNode extends TwoInputNode {
 			final boolean allowBCsecond = hint != CrossHint.FIRST_IS_SMALL;
 			
 			final OperatorDescriptorDual fixedDriverStrat;
-			if (PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_FIRST.equals(localStrategy)) {
+			if (Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_FIRST.equals(localStrategy)) {
 				fixedDriverStrat = new CrossBlockOuterFirstDescriptor(allowBCfirst, allowBCsecond);
-			} else if (PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_SECOND.equals(localStrategy)) {
+			} else if (Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_SECOND.equals(localStrategy)) {
 				fixedDriverStrat = new CrossBlockOuterSecondDescriptor(allowBCfirst, allowBCsecond);
-			} else if (PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_FIRST.equals(localStrategy)) {
+			} else if (Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_FIRST.equals(localStrategy)) {
 				fixedDriverStrat = new CrossStreamOuterFirstDescriptor(allowBCfirst, allowBCsecond);
-			} else if (PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_SECOND.equals(localStrategy)) {
+			} else if (Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_SECOND.equals(localStrategy)) {
 				fixedDriverStrat = new CrossStreamOuterSecondDescriptor(allowBCfirst, allowBCsecond);
 			} else {
 				throw new CompilerException("Invalid local strategy hint for cross contract: " + localStrategy);
@@ -99,8 +99,8 @@ public class CrossNode extends TwoInputNode {
 	// ------------------------------------------------------------------------
 
 	@Override
-	public CrossOperatorBase<?, ?, ?, ?> getPactContract() {
-		return (CrossOperatorBase<?, ?, ?, ?>) super.getPactContract();
+	public CrossOperatorBase<?, ?, ?, ?> getOperator() {
+		return (CrossOperatorBase<?, ?, ?, ?>) super.getOperator();
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/DagConnection.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/DagConnection.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/DagConnection.java
new file mode 100644
index 0000000..360f579
--- /dev/null
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/DagConnection.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dag;
+
+import org.apache.flink.api.common.ExecutionMode;
+import org.apache.flink.optimizer.dataproperties.InterestingProperties;
+import org.apache.flink.optimizer.plandump.DumpableConnection;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+
+/**
+ * A connection between to operators. Represents an intermediate result
+ * and a data exchange between the two operators.
+ *
+ * The data exchange has a mode in which it performs (batch / pipelined).
+ *
+ * The data exchange strategy may be set on this connection, in which case
+ * it is fixed and will not be determined during candidate plan enumeration.
+ *
+ * During the enumeration of interesting properties, this connection also holds
+ * all interesting properties generated by the successor operator.
+ */
+public class DagConnection implements EstimateProvider, DumpableConnection<OptimizerNode> {
+	
+	private final OptimizerNode source; // The source node of the connection
+
+	private final OptimizerNode target; // The target node of the connection.
+
+	private final ExecutionMode dataExchangeMode; // defines whether to use batch or pipelined data exchange
+
+	private InterestingProperties interestingProps; // local properties that succeeding nodes are interested in
+
+	private ShipStrategyType shipStrategy; // The data shipping strategy, if predefined.
+	
+	private TempMode materializationMode = TempMode.NONE; // the materialization mode
+	
+	private int maxDepth = -1;
+
+	private boolean breakPipeline;  // whether this connection should break the pipeline due to potential deadlocks
+
+	/**
+	 * Creates a new Connection between two nodes. The shipping strategy is by default <tt>NONE</tt>.
+	 * The temp mode is by default <tt>NONE</tt>.
+	 * 
+	 * @param source
+	 *        The source node.
+	 * @param target
+	 *        The target node.
+	 */
+	public DagConnection(OptimizerNode source, OptimizerNode target, ExecutionMode exchangeMode) {
+		this(source, target, null, exchangeMode);
+	}
+
+	/**
+	 * Creates a new Connection between two nodes.
+	 * 
+	 * @param source
+	 *        The source node.
+	 * @param target
+	 *        The target node.
+	 * @param shipStrategy
+	 *        The shipping strategy.
+	 * @param exchangeMode
+	 *        The data exchange mode (pipelined / batch / batch only for shuffles / ... )
+	 */
+	public DagConnection(OptimizerNode source, OptimizerNode target,
+						 ShipStrategyType shipStrategy, ExecutionMode exchangeMode)
+	{
+		if (source == null || target == null) {
+			throw new NullPointerException("Source and target must not be null.");
+		}
+		this.source = source;
+		this.target = target;
+		this.shipStrategy = shipStrategy;
+		this.dataExchangeMode = exchangeMode;
+	}
+	
+	/**
+	 * Constructor to create a result from an operator that is not
+	 * consumed by another operator.
+	 * 
+	 * @param source
+	 *        The source node.
+	 */
+	public DagConnection(OptimizerNode source, ExecutionMode exchangeMode) {
+		if (source == null) {
+			throw new NullPointerException("Source and target must not be null.");
+		}
+		this.source = source;
+		this.target = null;
+		this.shipStrategy = ShipStrategyType.NONE;
+		this.dataExchangeMode = exchangeMode;
+	}
+
+	/**
+	 * Gets the source of the connection.
+	 * 
+	 * @return The source Node.
+	 */
+	public OptimizerNode getSource() {
+		return this.source;
+	}
+
+	/**
+	 * Gets the target of the connection.
+	 * 
+	 * @return The target node.
+	 */
+	public OptimizerNode getTarget() {
+		return this.target;
+	}
+
+	/**
+	 * Gets the shipping strategy for this connection.
+	 * 
+	 * @return The connection's shipping strategy.
+	 */
+	public ShipStrategyType getShipStrategy() {
+		return this.shipStrategy;
+	}
+
+	/**
+	 * Sets the shipping strategy for this connection.
+	 * 
+	 * @param strategy
+	 *        The shipping strategy to be applied to this connection.
+	 */
+	public void setShipStrategy(ShipStrategyType strategy) {
+		this.shipStrategy = strategy;
+	}
+
+	/**
+	 * Gets the data exchange mode to use for this connection.
+	 *
+	 * @return The data exchange mode to use for this connection.
+	 */
+	public ExecutionMode getDataExchangeMode() {
+		if (dataExchangeMode == null) {
+			throw new IllegalStateException("This connection does not have the data exchange mode set");
+		}
+		return dataExchangeMode;
+	}
+
+	/**
+	 * Marks that this connection should do a decoupled data exchange (such as batched)
+	 * rather then pipeline data. Connections are marked as pipeline breakers to avoid
+	 * deadlock situations.
+	 */
+	public void markBreaksPipeline() {
+		this.breakPipeline = true;
+	}
+
+	/**
+	 * Checks whether this connection is marked to break the pipeline.
+	 *
+	 * @return True, if this connection is marked to break the pipeline, false otherwise.
+	 */
+	public boolean isBreakingPipeline() {
+		return this.breakPipeline;
+	}
+
+	/**
+	 * Gets the interesting properties object for this pact connection.
+	 * If the interesting properties for this connections have not yet been set,
+	 * this method returns null.
+	 * 
+	 * @return The collection of all interesting properties, or null, if they have not yet been set.
+	 */
+	public InterestingProperties getInterestingProperties() {
+		return this.interestingProps;
+	}
+
+	/**
+	 * Sets the interesting properties for this pact connection.
+	 * 
+	 * @param props The interesting properties.
+	 */
+	public void setInterestingProperties(InterestingProperties props) {
+		if (this.interestingProps == null) {
+			this.interestingProps = props;
+		} else {
+			throw new IllegalStateException("Interesting Properties have already been set.");
+		}
+	}
+	
+	public void clearInterestingProperties() {
+		this.interestingProps = null;
+	}
+	
+	public void initMaxDepth() {
+		
+		if (this.maxDepth == -1) {
+			this.maxDepth = this.source.getMaxDepth() + 1;
+		} else {
+			throw new IllegalStateException("Maximum path depth has already been initialized.");
+		}
+	}
+	
+	public int getMaxDepth() {
+		if (this.maxDepth != -1) {
+			return this.maxDepth;
+		} else {
+			throw new IllegalStateException("Maximum path depth has not been initialized.");
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Estimates
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public long getEstimatedOutputSize() {
+		return this.source.getEstimatedOutputSize();
+	}
+
+	@Override
+	public long getEstimatedNumRecords() {
+		return this.source.getEstimatedNumRecords();
+	}
+	
+	@Override
+	public float getEstimatedAvgWidthPerOutputRecord() {
+		return this.source.getEstimatedAvgWidthPerOutputRecord();
+	}
+	
+	// --------------------------------------------------------------------------------------------
+
+	
+	public TempMode getMaterializationMode() {
+		return this.materializationMode;
+	}
+	
+	public void setMaterializationMode(TempMode materializationMode) {
+		this.materializationMode = materializationMode;
+	}
+	
+	public boolean isOnDynamicPath() {
+		return this.source.isOnDynamicPath();
+	}
+	
+	public int getCostWeight() {
+		return this.source.getCostWeight();
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	public String toString() {
+		StringBuilder buf = new StringBuilder(50);
+		buf.append("Connection: ");
+
+		if (this.source == null) {
+			buf.append("null");
+		} else {
+			buf.append(this.source.getOperator().getName());
+			buf.append('(').append(this.source.getName()).append(')');
+		}
+
+		buf.append(" -> ");
+
+		if (this.shipStrategy != null) {
+			buf.append('[');
+			buf.append(this.shipStrategy.name());
+			buf.append(']').append(' ');
+		}
+
+		if (this.target == null) {
+			buf.append("null");
+		} else {
+			buf.append(this.target.getOperator().getName());
+			buf.append('(').append(this.target.getName()).append(')');
+		}
+
+		return buf.toString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/DataSinkNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/DataSinkNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/DataSinkNode.java
index 9e4f457..dbe04f4 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/DataSinkNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/DataSinkNode.java
@@ -46,7 +46,7 @@ import org.apache.flink.util.Visitor;
  */
 public class DataSinkNode extends OptimizerNode {
 	
-	protected PactConnection input;			// The input edge
+	protected DagConnection input;			// The input edge
 	
 	/**
 	 * Creates a new DataSinkNode for the given sink operator.
@@ -64,7 +64,7 @@ public class DataSinkNode extends OptimizerNode {
 	 * 
 	 * @return The input connection.
 	 */
-	public PactConnection getInputConnection() {
+	public DagConnection getInputConnection() {
 		return this.input;
 	}
 	
@@ -87,8 +87,8 @@ public class DataSinkNode extends OptimizerNode {
 	 * @return The node's underlying operator.
 	 */
 	@Override
-	public GenericDataSinkBase<?> getPactContract() {
-		return (GenericDataSinkBase<?>) super.getPactContract();
+	public GenericDataSinkBase<?> getOperator() {
+		return (GenericDataSinkBase<?>) super.getOperator();
 	}
 
 	@Override
@@ -97,7 +97,7 @@ public class DataSinkNode extends OptimizerNode {
 	}
 
 	@Override
-	public List<PactConnection> getIncomingConnections() {
+	public List<DagConnection> getIncomingConnections() {
 		return Collections.singletonList(this.input);
 	}
 
@@ -107,19 +107,19 @@ public class DataSinkNode extends OptimizerNode {
 	 * @return An empty list.
 	 */
 	@Override
-	public List<PactConnection> getOutgoingConnections() {
+	public List<DagConnection> getOutgoingConnections() {
 		return Collections.emptyList();
 	}
 
 	@Override
 	public void setInput(Map<Operator<?>, OptimizerNode> contractToNode, ExecutionMode defaultExchangeMode) {
-		Operator<?> children = getPactContract().getInput();
+		Operator<?> children = getOperator().getInput();
 
 		final OptimizerNode pred;
-		final PactConnection conn;
+		final DagConnection conn;
 		
 		pred = contractToNode.get(children);
-		conn = new PactConnection(pred, this, defaultExchangeMode);
+		conn = new DagConnection(pred, this, defaultExchangeMode);
 			
 		// create the connection and add it
 		this.input = conn;
@@ -141,8 +141,8 @@ public class DataSinkNode extends OptimizerNode {
 		final InterestingProperties iProps = new InterestingProperties();
 		
 		{
-			final Ordering partitioning = getPactContract().getPartitionOrdering();
-			final DataDistribution dataDist = getPactContract().getDataDistribution();
+			final Ordering partitioning = getOperator().getPartitionOrdering();
+			final DataDistribution dataDist = getOperator().getDataDistribution();
 			final RequestedGlobalProperties partitioningProps = new RequestedGlobalProperties();
 			if (partitioning != null) {
 				if(dataDist != null) {
@@ -156,7 +156,7 @@ public class DataSinkNode extends OptimizerNode {
 		}
 		
 		{
-			final Ordering localOrder = getPactContract().getLocalOrder();
+			final Ordering localOrder = getOperator().getLocalOrder();
 			final RequestedLocalProperties orderProps = new RequestedLocalProperties();
 			if (localOrder != null) {
 				orderProps.setOrdering(localOrder);
@@ -184,7 +184,7 @@ public class DataSinkNode extends OptimizerNode {
 	}
 	
 	@Override
-	protected List<UnclosedBranchDescriptor> getBranchesForParent(PactConnection parent) {
+	protected List<UnclosedBranchDescriptor> getBranchesForParent(DagConnection parent) {
 		// return our own stack of open branches, because nothing is added
 		return this.openBranches;
 	}
@@ -204,8 +204,8 @@ public class DataSinkNode extends OptimizerNode {
 		List<? extends PlanNode> subPlans = getPredecessorNode().getAlternativePlans(estimator);
 		List<PlanNode> outputPlans = new ArrayList<PlanNode>();
 		
-		final int dop = getDegreeOfParallelism();
-		final int inDop = getPredecessorNode().getDegreeOfParallelism();
+		final int dop = getParallelism();
+		final int inDop = getPredecessorNode().getParallelism();
 
 		final ExecutionMode executionMode = this.input.getDataExchangeMode();
 		final boolean dopChange = dop != inDop;
@@ -224,7 +224,7 @@ public class DataSinkNode extends OptimizerNode {
 					// no need to check whether the created properties meet what we need in case
 					// of ordering or global ordering, because the only interesting properties we have
 					// are what we require
-					outputPlans.add(new SinkPlanNode(this, "DataSink ("+this.getPactContract().getName()+")" ,c));
+					outputPlans.add(new SinkPlanNode(this, "DataSink ("+this.getOperator().getName()+")" ,c));
 				}
 			}
 		}