You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/20 11:06:50 UTC

[11/53] [abbrv] flink git commit: [optimizer] Rename optimizer project to "flink-optimizer" (previously flink-compiler)

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
new file mode 100644
index 0000000..04bc527
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
@@ -0,0 +1,1578 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.plantranslate;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.aggregators.AggregatorRegistry;
+import org.apache.flink.api.common.aggregators.AggregatorWithName;
+import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
+import org.apache.flink.api.common.aggregators.LongSumAggregator;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry;
+import org.apache.flink.api.common.distributions.DataDistribution;
+import org.apache.flink.api.common.operators.util.UserCodeWrapper;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.dag.TempMode;
+import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
+import org.apache.flink.optimizer.plan.BulkPartialSolutionPlanNode;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.DualInputPlanNode;
+import org.apache.flink.optimizer.plan.IterationPlanNode;
+import org.apache.flink.optimizer.plan.NAryUnionPlanNode;
+import org.apache.flink.optimizer.plan.NamedChannel;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.PlanNode;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.optimizer.plan.SolutionSetPlanNode;
+import org.apache.flink.optimizer.plan.SourcePlanNode;
+import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
+import org.apache.flink.optimizer.plan.WorksetPlanNode;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.io.network.DataExchangeMode;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.iterative.convergence.WorksetEmptyConvergenceCriterion;
+import org.apache.flink.runtime.iterative.task.IterationHeadPactTask;
+import org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask;
+import org.apache.flink.runtime.iterative.task.IterationSynchronizationSinkTask;
+import org.apache.flink.runtime.iterative.task.IterationTailPactTask;
+import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.InputFormatVertex;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.OutputFormatVertex;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.operators.CoGroupDriver;
+import org.apache.flink.runtime.operators.CoGroupWithSolutionSetFirstDriver;
+import org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDriver;
+import org.apache.flink.runtime.operators.DataSinkTask;
+import org.apache.flink.runtime.operators.DataSourceTask;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.JoinWithSolutionSetFirstDriver;
+import org.apache.flink.runtime.operators.JoinWithSolutionSetSecondDriver;
+import org.apache.flink.runtime.operators.MatchDriver;
+import org.apache.flink.runtime.operators.NoOpDriver;
+import org.apache.flink.runtime.operators.RegularPactTask;
+import org.apache.flink.runtime.operators.chaining.ChainedDriver;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.runtime.operators.util.LocalStrategy;
+import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Visitor;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * This component translates the optimizer's resulting {@link org.apache.flink.optimizer.plan.OptimizedPlan}
+ * to a {@link org.apache.flink.runtime.jobgraph.JobGraph}. The translation is not strictly a one-to-one,
+ * because some nodes from the OptimizedPlan are collapsed into one job vertex.
+ *
+ * This translation does not make any decisions or assumptions. All degrees-of-freedom in the execution
+ * of the job are made by the Optimizer, so that this translation becomes a deterministic mapping.
+ *
+ * The basic method of operation is a top down traversal over the plan graph. On the way down, job vertices
+ * are created for the plan nodes, on the way back up, the nodes connect their predecessors.
+ */
+public class JobGraphGenerator implements Visitor<PlanNode> {
+	
+	public static final String MERGE_ITERATION_AUX_TASKS_KEY = "compiler.merge-iteration-aux";
+	
+	private static final boolean mergeIterationAuxTasks = GlobalConfiguration.getBoolean(MERGE_ITERATION_AUX_TASKS_KEY, false);
+	
+	private static final TaskInChain ALREADY_VISITED_PLACEHOLDER = new TaskInChain(null, null, null);
+	
+	// ------------------------------------------------------------------------
+
+	private Map<PlanNode, AbstractJobVertex> vertices; // a map from optimizer nodes to job vertices
+	
+	private Map<PlanNode, TaskInChain> chainedTasks; // a map from optimizer nodes to job vertices
+	
+	private Map<IterationPlanNode, IterationDescriptor> iterations;
+	
+	private List<TaskInChain> chainedTasksInSequence;
+	
+	private List<AbstractJobVertex> auxVertices; // auxiliary vertices which are added during job graph generation
+	
+	private final int defaultMaxFan;
+	
+	private final float defaultSortSpillingThreshold;
+	
+	private int iterationIdEnumerator = 1;
+	
+	private IterationPlanNode currentIteration; // the current the enclosing iteration
+	
+	private List<IterationPlanNode> iterationStack;  // stack of enclosing iterations
+	
+	private SlotSharingGroup sharingGroup;
+	
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates a new job graph generator that uses the default values for its resource configuration.
+	 */
+	public JobGraphGenerator() {
+		this.defaultMaxFan = ConfigConstants.DEFAULT_SPILLING_MAX_FAN;
+		this.defaultSortSpillingThreshold = ConfigConstants.DEFAULT_SORT_SPILLING_THRESHOLD;
+	}
+	
+	public JobGraphGenerator(Configuration config) {
+		this.defaultMaxFan = config.getInteger(ConfigConstants.DEFAULT_SPILLING_MAX_FAN_KEY, 
+				ConfigConstants.DEFAULT_SPILLING_MAX_FAN);
+		this.defaultSortSpillingThreshold = config.getFloat(ConfigConstants.DEFAULT_SORT_SPILLING_THRESHOLD_KEY,
+			ConfigConstants.DEFAULT_SORT_SPILLING_THRESHOLD);
+	}
+
+	/**
+	 * Translates a {@link org.apache.flink.optimizer.plan.OptimizedPlan} into a
+	 * {@link org.apache.flink.runtime.jobgraph.JobGraph}.
+	 * 
+	 * @param program Optimized plan that is translated into a JobGraph.
+	 * @return JobGraph generated frmo the plan.
+	 */
+	public JobGraph compileJobGraph(OptimizedPlan program) {
+		this.vertices = new HashMap<PlanNode, AbstractJobVertex>();
+		this.chainedTasks = new HashMap<PlanNode, TaskInChain>();
+		this.chainedTasksInSequence = new ArrayList<TaskInChain>();
+		this.auxVertices = new ArrayList<AbstractJobVertex>();
+		this.iterations = new HashMap<IterationPlanNode, IterationDescriptor>();
+		this.iterationStack = new ArrayList<IterationPlanNode>();
+		
+		this.sharingGroup = new SlotSharingGroup();
+		
+		// this starts the traversal that generates the job graph
+		program.accept(this);
+		
+		// sanity check that we are not somehow in an iteration at the end
+		if (this.currentIteration != null) {
+			throw new CompilerException("The graph translation ended prematurely, leaving an unclosed iteration.");
+		}
+		
+		// finalize the iterations
+		for (IterationDescriptor iteration : this.iterations.values()) {
+			if (iteration.getIterationNode() instanceof BulkIterationPlanNode) {
+				finalizeBulkIteration(iteration);
+			} else if (iteration.getIterationNode() instanceof WorksetIterationPlanNode) {
+				finalizeWorksetIteration(iteration);
+			} else {
+				throw new CompilerException();
+			}
+		}
+		
+		// now that the traversal is done, we have the chained tasks write their configs into their
+		// parents' configurations
+		for (TaskInChain tic : this.chainedTasksInSequence) {
+			TaskConfig t = new TaskConfig(tic.getContainingVertex().getConfiguration());
+			t.addChainedTask(tic.getChainedTask(), tic.getTaskConfig(), tic.getTaskName());
+		}
+		
+		// create the job graph object
+		JobGraph graph = new JobGraph(program.getJobName());
+		graph.setNumberOfExecutionRetries(program.getOriginalPactPlan().getNumberOfExecutionRetries());
+		graph.setAllowQueuedScheduling(false);
+		
+		// add vertices to the graph
+		for (AbstractJobVertex vertex : this.vertices.values()) {
+			graph.addVertex(vertex);
+		}
+		
+		for (AbstractJobVertex vertex : this.auxVertices) {
+			graph.addVertex(vertex);
+			vertex.setSlotSharingGroup(sharingGroup);
+		}
+		
+		// add registered cache file into job configuration
+		for (Entry<String, DistributedCacheEntry> e : program.getOriginalPactPlan().getCachedFiles()) {
+			DistributedCache.writeFileInfoToConfig(e.getKey(), e.getValue(), graph.getJobConfiguration());
+		}
+
+		try {
+			InstantiationUtil.writeObjectToConfig(
+					program.getOriginalPactPlan().getExecutionConfig(),
+					graph.getJobConfiguration(),
+					ExecutionConfig.CONFIG_KEY);
+		} catch (IOException e) {
+			throw new RuntimeException("Config object could not be written to Job Configuration: " + e);
+		}
+
+		// release all references again
+		this.vertices = null;
+		this.chainedTasks = null;
+		this.chainedTasksInSequence = null;
+		this.auxVertices = null;
+		this.iterations = null;
+		this.iterationStack = null;
+		
+		// return job graph
+		return graph;
+	}
+	
+	/**
+	 * This methods implements the pre-visiting during a depth-first traversal. It create the job vertex and
+	 * sets local strategy.
+	 * 
+	 * @param node
+	 *        The node that is currently processed.
+	 * @return True, if the visitor should descend to the node's children, false if not.
+	 * @see org.apache.flink.util.Visitor#preVisit(org.apache.flink.util.Visitable)
+	 */
+	@Override
+	public boolean preVisit(PlanNode node) {
+		// check if we have visited this node before. in non-tree graphs, this happens
+		if (this.vertices.containsKey(node) || this.chainedTasks.containsKey(node) || this.iterations.containsKey(node)) {
+			// return false to prevent further descend
+			return false;
+		}
+
+		// the vertex to be created for the current node
+		final AbstractJobVertex vertex;
+		try {
+			if (node instanceof SinkPlanNode) {
+				vertex = createDataSinkVertex((SinkPlanNode) node);
+			}
+			else if (node instanceof SourcePlanNode) {
+				vertex = createDataSourceVertex((SourcePlanNode) node);
+			}
+			else if (node instanceof BulkIterationPlanNode) {
+				BulkIterationPlanNode iterationNode = (BulkIterationPlanNode) node;
+				// for the bulk iteration, we skip creating anything for now. we create the graph
+				// for the step function in the post visit.
+				
+				// check that the root of the step function has the same DOP as the iteration.
+				// because the tail must have the same DOP as the head, we can only merge the last
+				// operator with the tail, if they have the same DOP. not merging is currently not
+				// implemented
+				PlanNode root = iterationNode.getRootOfStepFunction();
+				if (root.getParallelism() != node.getParallelism())
+				{
+					throw new CompilerException("Error: The final operator of the step " +
+							"function has a different degree of parallelism than the iteration operator itself.");
+				}
+				
+				IterationDescriptor descr = new IterationDescriptor(iterationNode, this.iterationIdEnumerator++);
+				this.iterations.put(iterationNode, descr);
+				vertex = null;
+			}
+			else if (node instanceof WorksetIterationPlanNode) {
+				WorksetIterationPlanNode iterationNode = (WorksetIterationPlanNode) node;
+
+				// we have the same constraints as for the bulk iteration
+				PlanNode nextWorkSet = iterationNode.getNextWorkSetPlanNode();
+				PlanNode solutionSetDelta  = iterationNode.getSolutionSetDeltaPlanNode();
+				
+				if (nextWorkSet.getParallelism() != node.getParallelism())
+				{
+					throw new CompilerException("It is currently not supported that the final operator of the step " +
+							"function has a different degree of parallelism than the iteration operator itself.");
+				}
+				if (solutionSetDelta.getParallelism() != node.getParallelism())
+				{
+					throw new CompilerException("It is currently not supported that the final operator of the step " +
+							"function has a different degree of parallelism than the iteration operator itself.");
+				}
+				
+				IterationDescriptor descr = new IterationDescriptor(iterationNode, this.iterationIdEnumerator++);
+				this.iterations.put(iterationNode, descr);
+				vertex = null;
+			}
+			else if (node instanceof SingleInputPlanNode) {
+				vertex = createSingleInputVertex((SingleInputPlanNode) node);
+			}
+			else if (node instanceof DualInputPlanNode) {
+				vertex = createDualInputVertex((DualInputPlanNode) node);
+			}
+			else if (node instanceof NAryUnionPlanNode) {
+				// skip the union for now
+				vertex = null;
+			}
+			else if (node instanceof BulkPartialSolutionPlanNode) {
+				// create a head node (or not, if it is merged into its successor)
+				vertex = createBulkIterationHead((BulkPartialSolutionPlanNode) node);
+			}
+			else if (node instanceof SolutionSetPlanNode) {
+				// this represents an access into the solution set index.
+				// we do not create a vertex for the solution set here (we create the head at the workset place holder)
+				
+				// we adjust the joins / cogroups that go into the solution set here
+				for (Channel c : node.getOutgoingChannels()) {
+					DualInputPlanNode target = (DualInputPlanNode) c.getTarget();
+					AbstractJobVertex accessingVertex = this.vertices.get(target);
+					TaskConfig conf = new TaskConfig(accessingVertex.getConfiguration());
+					int inputNum = c == target.getInput1() ? 0 : c == target.getInput2() ? 1 : -1;
+					
+					// sanity checks
+					if (inputNum == -1) {
+						throw new CompilerException();
+					}
+					
+					// adjust the driver
+					if (conf.getDriver().equals(MatchDriver.class)) {
+						conf.setDriver(inputNum == 0 ? JoinWithSolutionSetFirstDriver.class : JoinWithSolutionSetSecondDriver.class);
+					}
+					else if (conf.getDriver().equals(CoGroupDriver.class)) {
+						conf.setDriver(inputNum == 0 ? CoGroupWithSolutionSetFirstDriver.class : CoGroupWithSolutionSetSecondDriver.class);
+					}
+					else {
+						throw new CompilerException("Found join with solution set using incompatible operator (only Join/CoGroup are valid).");
+					}
+				}
+				
+				// make sure we do not visit this node again. for that, we add a 'already seen' entry into one of the sets
+				this.chainedTasks.put(node, ALREADY_VISITED_PLACEHOLDER);
+				
+				vertex = null;
+			}
+			else if (node instanceof WorksetPlanNode) {
+				// create the iteration head here
+				vertex = createWorksetIterationHead((WorksetPlanNode) node);
+			}
+			else {
+				throw new CompilerException("Unrecognized node type: " + node.getClass().getName());
+			}
+		}
+		catch (Exception e) {
+			throw new CompilerException("Error translating node '" + node + "': " + e.getMessage(), e);
+		}
+		
+		// check if a vertex was created, or if it was chained or skipped
+		if (vertex != null) {
+			// set degree of parallelism
+			int pd = node.getParallelism();
+			vertex.setParallelism(pd);
+			
+			vertex.setSlotSharingGroup(sharingGroup);
+			
+			// check whether this vertex is part of an iteration step function
+			if (this.currentIteration != null) {
+				// check that the task has the same DOP as the iteration as such
+				PlanNode iterationNode = (PlanNode) this.currentIteration;
+				if (iterationNode.getParallelism() < pd) {
+					throw new CompilerException("Error: All functions that are part of an iteration must have the same, or a lower, degree-of-parallelism than the iteration operator.");
+				}
+
+				// store the id of the iterations the step functions participate in
+				IterationDescriptor descr = this.iterations.get(this.currentIteration);
+				new TaskConfig(vertex.getConfiguration()).setIterationId(descr.getId());
+			}
+	
+			// store in the map
+			this.vertices.put(node, vertex);
+		}
+
+		// returning true causes deeper descend
+		return true;
+	}
+
+	/**
+	 * This method implements the post-visit during the depth-first traversal. When the post visit happens,
+	 * all of the descendants have been processed, so this method connects all of the current node's
+	 * predecessors to the current node.
+	 * 
+	 * @param node
+	 *        The node currently processed during the post-visit.
+	 * @see org.apache.flink.util.Visitor#postVisit(org.apache.flink.util.Visitable) t
+	 */
+	@Override
+	public void postVisit(PlanNode node) {
+		try {
+			// --------- check special cases for which we handle post visit differently ----------
+			
+			// skip data source node (they have no inputs)
+			// also, do nothing for union nodes, we connect them later when gathering the inputs for a task
+			// solution sets have no input. the initial solution set input is connected when the iteration node is in its postVisit
+			if (node instanceof SourcePlanNode || node instanceof NAryUnionPlanNode || node instanceof SolutionSetPlanNode) {
+				return;
+			}
+			
+			// check if we have an iteration. in that case, translate the step function now
+			if (node instanceof IterationPlanNode) {
+				// prevent nested iterations
+				if (node.isOnDynamicPath()) {
+					throw new CompilerException("Nested Iterations are not possible at the moment!");
+				}
+				
+				// if we recursively go into an iteration (because the constant path of one iteration contains
+				// another one), we push the current one onto the stack
+				if (this.currentIteration != null) {
+					this.iterationStack.add(this.currentIteration);
+				}
+				
+				this.currentIteration = (IterationPlanNode) node;
+				this.currentIteration.acceptForStepFunction(this);
+				
+				// pop the current iteration from the stack
+				if (this.iterationStack.isEmpty()) {
+					this.currentIteration = null;
+				} else {
+					this.currentIteration = this.iterationStack.remove(this.iterationStack.size() - 1);
+				}
+				
+				// inputs for initial bulk partial solution or initial workset are already connected to the iteration head in the head's post visit.
+				// connect the initial solution set now.
+				if (node instanceof WorksetIterationPlanNode) {
+					// connect the initial solution set
+					WorksetIterationPlanNode wsNode = (WorksetIterationPlanNode) node;
+					AbstractJobVertex headVertex = this.iterations.get(wsNode).getHeadTask();
+					TaskConfig headConfig = new TaskConfig(headVertex.getConfiguration());
+					int inputIndex = headConfig.getDriverStrategy().getNumInputs();
+					headConfig.setIterationHeadSolutionSetInputIndex(inputIndex);
+					translateChannel(wsNode.getInitialSolutionSetInput(), inputIndex, headVertex, headConfig, false);
+				}
+				
+				return;
+			}
+			
+			final AbstractJobVertex targetVertex = this.vertices.get(node);
+			
+			
+			// --------- Main Path: Translation of channels ----------
+			// 
+			// There are two paths of translation: One for chained tasks (or merged tasks in general),
+			// which do not have their own task vertex. The other for tasks that have their own vertex,
+			// or are the primary task in a vertex (to which the others are chained).
+			
+			// check whether this node has its own task, or is merged with another one
+			if (targetVertex == null) {
+				// node's task is merged with another task. it is either chained, of a merged head vertex
+				// from an iteration
+				final TaskInChain chainedTask;
+				if ((chainedTask = this.chainedTasks.get(node)) != null) {
+					// Chained Task. Sanity check first...
+					final Iterator<Channel> inConns = node.getInputs().iterator();
+					if (!inConns.hasNext()) {
+						throw new CompilerException("Bug: Found chained task with no input.");
+					}
+					final Channel inConn = inConns.next();
+					
+					if (inConns.hasNext()) {
+						throw new CompilerException("Bug: Found a chained task with more than one input!");
+					}
+					if (inConn.getLocalStrategy() != null && inConn.getLocalStrategy() != LocalStrategy.NONE) {
+						throw new CompilerException("Bug: Found a chained task with an input local strategy.");
+					}
+					if (inConn.getShipStrategy() != null && inConn.getShipStrategy() != ShipStrategyType.FORWARD) {
+						throw new CompilerException("Bug: Found a chained task with an input ship strategy other than FORWARD.");
+					}
+	
+					AbstractJobVertex container = chainedTask.getContainingVertex();
+					
+					if (container == null) {
+						final PlanNode sourceNode = inConn.getSource();
+						container = this.vertices.get(sourceNode);
+						if (container == null) {
+							// predecessor is itself chained
+							container = this.chainedTasks.get(sourceNode).getContainingVertex();
+							if (container == null) {
+								throw new IllegalStateException("Bug: Chained task predecessor has not been assigned its containing vertex.");
+							}
+						} else {
+							// predecessor is a proper task job vertex and this is the first chained task. add a forward connection entry.
+							new TaskConfig(container.getConfiguration()).addOutputShipStrategy(ShipStrategyType.FORWARD);
+						}
+						chainedTask.setContainingVertex(container);
+					}
+					
+					// add info about the input serializer type
+					chainedTask.getTaskConfig().setInputSerializer(inConn.getSerializer(), 0);
+					
+					// update name of container task
+					String containerTaskName = container.getName();
+					if(containerTaskName.startsWith("CHAIN ")) {
+						container.setName(containerTaskName+" -> "+chainedTask.getTaskName());
+					} else {
+						container.setName("CHAIN "+containerTaskName+" -> "+chainedTask.getTaskName());
+					}
+					
+					this.chainedTasksInSequence.add(chainedTask);
+					return;
+				}
+				else if (node instanceof BulkPartialSolutionPlanNode ||
+						node instanceof WorksetPlanNode)
+				{
+					// merged iteration head task. the task that the head is merged with will take care of it
+					return;
+				} else {
+					throw new CompilerException("Bug: Unrecognized merged task vertex.");
+				}
+			}
+			
+			// -------- Here, we translate non-chained tasks -------------
+			
+			
+			if (this.currentIteration != null) {
+				AbstractJobVertex head = this.iterations.get(this.currentIteration).getHeadTask();
+				// the head may still be null if we descend into the static parts first
+				if (head != null) {
+					targetVertex.setStrictlyCoLocatedWith(head);
+				}
+			}
+			
+			
+			// create the config that will contain all the description of the inputs
+			final TaskConfig targetVertexConfig = new TaskConfig(targetVertex.getConfiguration());
+						
+			// get the inputs. if this node is the head of an iteration, we obtain the inputs from the
+			// enclosing iteration node, because the inputs are the initial inputs to the iteration.
+			final Iterator<Channel> inConns;
+			if (node instanceof BulkPartialSolutionPlanNode) {
+				inConns = ((BulkPartialSolutionPlanNode) node).getContainingIterationNode().getInputs().iterator();
+				// because the partial solution has its own vertex, is has only one (logical) input.
+				// note this in the task configuration
+				targetVertexConfig.setIterationHeadPartialSolutionOrWorksetInputIndex(0);
+			} else if (node instanceof WorksetPlanNode) {
+				WorksetPlanNode wspn = (WorksetPlanNode) node;
+				// input that is the initial workset
+				inConns = Collections.singleton(wspn.getContainingIterationNode().getInput2()).iterator();
+				
+				// because we have a stand-alone (non-merged) workset iteration head, the initial workset will
+				// be input 0 and the solution set will be input 1
+				targetVertexConfig.setIterationHeadPartialSolutionOrWorksetInputIndex(0);
+				targetVertexConfig.setIterationHeadSolutionSetInputIndex(1);
+			} else {
+				inConns = node.getInputs().iterator();
+			}
+			if (!inConns.hasNext()) {
+				throw new CompilerException("Bug: Found a non-source task with no input.");
+			}
+			
+			int inputIndex = 0;
+			while (inConns.hasNext()) {
+				Channel input = inConns.next();
+				inputIndex += translateChannel(input, inputIndex, targetVertex, targetVertexConfig, false);
+			}
+			// broadcast variables
+			int broadcastInputIndex = 0;
+			for (NamedChannel broadcastInput: node.getBroadcastInputs()) {
+				int broadcastInputIndexDelta = translateChannel(broadcastInput, broadcastInputIndex, targetVertex, targetVertexConfig, true);
+				targetVertexConfig.setBroadcastInputName(broadcastInput.getName(), broadcastInputIndex);
+				targetVertexConfig.setBroadcastInputSerializer(broadcastInput.getSerializer(), broadcastInputIndex);
+				broadcastInputIndex += broadcastInputIndexDelta;
+			}
+		} catch (Exception e) {
+			throw new CompilerException(
+				"An error occurred while translating the optimized plan to a nephele JobGraph: " + e.getMessage(), e);
+		}
+	}
+	
+	private int translateChannel(Channel input, int inputIndex, AbstractJobVertex targetVertex,
+			TaskConfig targetVertexConfig, boolean isBroadcast) throws Exception
+	{
+		final PlanNode inputPlanNode = input.getSource();
+		final Iterator<Channel> allInChannels;
+		
+		if (inputPlanNode instanceof NAryUnionPlanNode) {
+			allInChannels = ((NAryUnionPlanNode) inputPlanNode).getListOfInputs().iterator();
+		}
+		else if (inputPlanNode instanceof BulkPartialSolutionPlanNode) {
+			if (this.vertices.get(inputPlanNode) == null) {
+				// merged iteration head
+				final BulkPartialSolutionPlanNode pspn = (BulkPartialSolutionPlanNode) inputPlanNode;
+				final BulkIterationPlanNode iterationNode = pspn.getContainingIterationNode();
+				
+				// check if the iteration's input is a union
+				if (iterationNode.getInput().getSource() instanceof NAryUnionPlanNode) {
+					allInChannels = ((NAryUnionPlanNode) iterationNode.getInput().getSource()).getInputs().iterator();
+				} else {
+					allInChannels = Collections.singletonList(iterationNode.getInput()).iterator();
+				}
+				
+				// also, set the index of the gate with the partial solution
+				targetVertexConfig.setIterationHeadPartialSolutionOrWorksetInputIndex(inputIndex);
+			} else {
+				// standalone iteration head
+				allInChannels = Collections.singletonList(input).iterator();
+			}
+		} else if (inputPlanNode instanceof WorksetPlanNode) {
+			if (this.vertices.get(inputPlanNode) == null) {
+				// merged iteration head
+				final WorksetPlanNode wspn = (WorksetPlanNode) inputPlanNode;
+				final WorksetIterationPlanNode iterationNode = wspn.getContainingIterationNode();
+				
+				// check if the iteration's input is a union
+				if (iterationNode.getInput2().getSource() instanceof NAryUnionPlanNode) {
+					allInChannels = ((NAryUnionPlanNode) iterationNode.getInput2().getSource()).getInputs().iterator();
+				} else {
+					allInChannels = Collections.singletonList(iterationNode.getInput2()).iterator();
+				}
+				
+				// also, set the index of the gate with the partial solution
+				targetVertexConfig.setIterationHeadPartialSolutionOrWorksetInputIndex(inputIndex);
+			} else {
+				// standalone iteration head
+				allInChannels = Collections.singletonList(input).iterator();
+			}
+		} else if (inputPlanNode instanceof SolutionSetPlanNode) {
+			// for now, skip connections with the solution set node, as this is a local index access (later to be parameterized here)
+			// rather than a vertex connection
+			return 0;
+		} else {
+			allInChannels = Collections.singletonList(input).iterator();
+		}
+		
+		// check that the type serializer is consistent
+		TypeSerializerFactory<?> typeSerFact = null;
+		
+		// accounting for channels on the dynamic path
+		int numChannelsTotal = 0;
+		int numChannelsDynamicPath = 0;
+		int numDynamicSenderTasksTotal = 0;
+		
+
+		// expand the channel to all the union channels, in case there is a union operator at its source
+		while (allInChannels.hasNext()) {
+			final Channel inConn = allInChannels.next();
+			
+			// sanity check the common serializer
+			if (typeSerFact == null) {
+				typeSerFact = inConn.getSerializer();
+			} else if (!typeSerFact.equals(inConn.getSerializer())) {
+				throw new CompilerException("Conflicting types in union operator.");
+			}
+			
+			final PlanNode sourceNode = inConn.getSource();
+			AbstractJobVertex sourceVertex = this.vertices.get(sourceNode);
+			TaskConfig sourceVertexConfig;
+
+			if (sourceVertex == null) {
+				// this predecessor is chained to another task or an iteration
+				final TaskInChain chainedTask;
+				final IterationDescriptor iteration;
+				if ((chainedTask = this.chainedTasks.get(sourceNode)) != null) {
+					// push chained task
+					if (chainedTask.getContainingVertex() == null) {
+						throw new IllegalStateException("Bug: Chained task has not been assigned its containing vertex when connecting.");
+					}
+					sourceVertex = chainedTask.getContainingVertex();
+					sourceVertexConfig = chainedTask.getTaskConfig();
+				} else if ((iteration = this.iterations.get(sourceNode)) != null) {
+					// predecessor is an iteration
+					sourceVertex = iteration.getHeadTask();
+					sourceVertexConfig = iteration.getHeadFinalResultConfig();
+				} else {
+					throw new CompilerException("Bug: Could not resolve source node for a channel.");
+				}
+			} else {
+				// predecessor is its own vertex
+				sourceVertexConfig = new TaskConfig(sourceVertex.getConfiguration());
+			}
+			DistributionPattern pattern = connectJobVertices(
+				inConn, inputIndex, sourceVertex, sourceVertexConfig, targetVertex, targetVertexConfig, isBroadcast);
+			
+			// accounting on channels and senders
+			numChannelsTotal++;
+			if (inConn.isOnDynamicPath()) {
+				numChannelsDynamicPath++;
+				numDynamicSenderTasksTotal += getNumberOfSendersPerReceiver(pattern,
+					sourceVertex.getParallelism(), targetVertex.getParallelism());
+			}
+		}
+		
+		// for the iterations, check that the number of dynamic channels is the same as the number
+		// of channels for this logical input. this condition is violated at the moment, if there
+		// is a union between nodes on the static and nodes on the dynamic path
+		if (numChannelsDynamicPath > 0 && numChannelsTotal != numChannelsDynamicPath) {
+			throw new CompilerException("Error: It is currently not supported to union between dynamic and static path in an iteration.");
+		}
+		if (numDynamicSenderTasksTotal > 0) {
+			if (isBroadcast) {
+				targetVertexConfig.setBroadcastGateIterativeWithNumberOfEventsUntilInterrupt(inputIndex, numDynamicSenderTasksTotal);
+			} else {
+				targetVertexConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(inputIndex, numDynamicSenderTasksTotal);
+			}
+		}
+		
+		// the local strategy is added only once. in non-union case that is the actual edge,
+		// in the union case, it is the edge between union and the target node
+		addLocalInfoFromChannelToConfig(input, targetVertexConfig, inputIndex, isBroadcast);
+		return 1;
+	}
+	
+	private int getNumberOfSendersPerReceiver(DistributionPattern pattern, int numSenders, int numReceivers) {
+		if (pattern == DistributionPattern.ALL_TO_ALL) {
+			return numSenders;
+		} else if (pattern == DistributionPattern.POINTWISE) {
+			if (numSenders != numReceivers) {
+				if (numReceivers == 1) {
+					return numSenders;
+				}
+				else if (numSenders == 1) {
+					return 1;
+				}
+				else {
+					throw new CompilerException("Error: A changing degree of parallelism is currently " +
+							"not supported between tasks within an iteration.");
+				}
+			} else {
+				return 1;
+			}
+		} else {
+			throw new CompilerException("Unknown distribution pattern for channels: " + pattern);
+		}
+	}
+	
+	// ------------------------------------------------------------------------
+	// Methods for creating individual vertices
+	// ------------------------------------------------------------------------
+	
+	private AbstractJobVertex createSingleInputVertex(SingleInputPlanNode node) throws CompilerException {
+		final String taskName = node.getNodeName();
+		final DriverStrategy ds = node.getDriverStrategy();
+		
+		// check, whether chaining is possible
+		boolean chaining = false;
+		{
+			Channel inConn = node.getInput();
+			PlanNode pred = inConn.getSource();
+			chaining = ds.getPushChainDriverClass() != null &&
+					!(pred instanceof NAryUnionPlanNode) &&	// first op after union is stand-alone, because union is merged
+					!(pred instanceof BulkPartialSolutionPlanNode) &&	// partial solution merges anyways
+					!(pred instanceof WorksetPlanNode) &&	// workset merges anyways
+					!(pred instanceof IterationPlanNode) && // cannot chain with iteration heads currently
+					inConn.getShipStrategy() == ShipStrategyType.FORWARD &&
+					inConn.getLocalStrategy() == LocalStrategy.NONE &&
+					pred.getOutgoingChannels().size() == 1 &&
+					node.getParallelism() == pred.getParallelism() &&
+					node.getBroadcastInputs().isEmpty();
+			
+			// cannot chain the nodes that produce the next workset or the next solution set, if they are not the
+			// in a tail 
+			if (this.currentIteration != null && this.currentIteration instanceof WorksetIterationPlanNode &&
+					node.getOutgoingChannels().size() > 0)
+			{
+				WorksetIterationPlanNode wspn = (WorksetIterationPlanNode) this.currentIteration;
+				if (wspn.getSolutionSetDeltaPlanNode() == pred || wspn.getNextWorkSetPlanNode() == pred) {
+					chaining = false;
+				}
+			}
+			// cannot chain the nodes that produce the next workset in a bulk iteration if a termination criterion follows
+			if (this.currentIteration != null && this.currentIteration instanceof BulkIterationPlanNode)
+			{
+				BulkIterationPlanNode wspn = (BulkIterationPlanNode) this.currentIteration;
+				if (node == wspn.getRootOfTerminationCriterion() && wspn.getRootOfStepFunction() == pred){
+					chaining = false;
+				}else if(node.getOutgoingChannels().size() > 0 &&(wspn.getRootOfStepFunction() == pred ||
+						wspn.getRootOfTerminationCriterion() == pred)) {
+					chaining = false;
+				}
+			}
+		}
+		
+		final AbstractJobVertex vertex;
+		final TaskConfig config;
+		
+		if (chaining) {
+			vertex = null;
+			config = new TaskConfig(new Configuration());
+			this.chainedTasks.put(node, new TaskInChain(ds.getPushChainDriverClass(), config, taskName));
+		} else {
+			// create task vertex
+			vertex = new AbstractJobVertex(taskName);
+			vertex.setInvokableClass((this.currentIteration != null && node.isOnDynamicPath()) ? IterationIntermediatePactTask.class : RegularPactTask.class);
+			
+			config = new TaskConfig(vertex.getConfiguration());
+			config.setDriver(ds.getDriverClass());
+		}
+		
+		// set user code
+		config.setStubWrapper(node.getProgramOperator().getUserCodeWrapper());
+		config.setStubParameters(node.getProgramOperator().getParameters());
+		
+		// set the driver strategy
+		config.setDriverStrategy(ds);
+		for(int i=0;i<ds.getNumRequiredComparators();i++) {
+			config.setDriverComparator(node.getComparator(i), i);
+		}
+		// assign memory, file-handles, etc.
+		assignDriverResources(node, config);
+		return vertex;
+	}
+
+	private AbstractJobVertex createDualInputVertex(DualInputPlanNode node) throws CompilerException {
+		final String taskName = node.getNodeName();
+		final DriverStrategy ds = node.getDriverStrategy();
+		final AbstractJobVertex vertex = new AbstractJobVertex(taskName);
+		final TaskConfig config = new TaskConfig(vertex.getConfiguration());
+		vertex.setInvokableClass( (this.currentIteration != null && node.isOnDynamicPath()) ? IterationIntermediatePactTask.class : RegularPactTask.class);
+		
+		// set user code
+		config.setStubWrapper(node.getProgramOperator().getUserCodeWrapper());
+		config.setStubParameters(node.getProgramOperator().getParameters());
+		
+		// set the driver strategy
+		config.setDriver(ds.getDriverClass());
+		config.setDriverStrategy(ds);
+		if (node.getComparator1() != null) {
+			config.setDriverComparator(node.getComparator1(), 0);
+		}
+		if (node.getComparator2() != null) {
+			config.setDriverComparator(node.getComparator2(), 1);
+		}
+		if (node.getPairComparator() != null) {
+			config.setDriverPairComparator(node.getPairComparator());
+		}
+		
+		// assign memory, file-handles, etc.
+		assignDriverResources(node, config);
+		return vertex;
+	}
+
+	private InputFormatVertex createDataSourceVertex(SourcePlanNode node) throws CompilerException {
+		final InputFormatVertex vertex = new InputFormatVertex(node.getNodeName());
+		final TaskConfig config = new TaskConfig(vertex.getConfiguration());
+
+		vertex.setInvokableClass(DataSourceTask.class);
+		vertex.setFormatDescription(getDescriptionForUserCode(node.getProgramOperator().getUserCodeWrapper()));
+
+		// set user code
+		config.setStubWrapper(node.getProgramOperator().getUserCodeWrapper());
+		config.setStubParameters(node.getProgramOperator().getParameters());
+
+		config.setOutputSerializer(node.getSerializer());
+		return vertex;
+	}
+
+	private AbstractJobVertex createDataSinkVertex(SinkPlanNode node) throws CompilerException {
+		final OutputFormatVertex vertex = new OutputFormatVertex(node.getNodeName());
+		final TaskConfig config = new TaskConfig(vertex.getConfiguration());
+
+		vertex.setInvokableClass(DataSinkTask.class);
+		vertex.setFormatDescription(getDescriptionForUserCode(node.getProgramOperator().getUserCodeWrapper()));
+		
+		// set user code
+		config.setStubWrapper(node.getProgramOperator().getUserCodeWrapper());
+		config.setStubParameters(node.getProgramOperator().getParameters());
+
+		return vertex;
+	}
+	
+	private AbstractJobVertex createBulkIterationHead(BulkPartialSolutionPlanNode pspn) {
+		// get the bulk iteration that corresponds to this partial solution node
+		final BulkIterationPlanNode iteration = pspn.getContainingIterationNode();
+		
+		// check whether we need an individual vertex for the partial solution, or whether we
+		// attach ourselves to the vertex of the parent node. We can combine the head with a node of 
+		// the step function, if
+		// 1) There is one parent that the partial solution connects to via a forward pattern and no
+		//    local strategy
+		// 2) DOP and the number of subtasks per instance does not change
+		// 3) That successor is not a union
+		// 4) That successor is not itself the last node of the step function
+		// 5) There is no local strategy on the edge for the initial partial solution, as
+		//    this translates to a local strategy that would only be executed in the first iteration
+		
+		final boolean merge;
+		if (mergeIterationAuxTasks && pspn.getOutgoingChannels().size() == 1) {
+			final Channel c = pspn.getOutgoingChannels().get(0);
+			final PlanNode successor = c.getTarget();
+			merge = c.getShipStrategy() == ShipStrategyType.FORWARD &&
+					c.getLocalStrategy() == LocalStrategy.NONE &&
+					c.getTempMode() == TempMode.NONE &&
+					successor.getParallelism() == pspn.getParallelism() &&
+					!(successor instanceof NAryUnionPlanNode) &&
+					successor != iteration.getRootOfStepFunction() &&
+					iteration.getInput().getLocalStrategy() == LocalStrategy.NONE;
+		} else {
+			merge = false;
+		}
+		
+		// create or adopt the head vertex
+		final AbstractJobVertex toReturn;
+		final AbstractJobVertex headVertex;
+		final TaskConfig headConfig;
+		if (merge) {
+			final PlanNode successor = pspn.getOutgoingChannels().get(0).getTarget();
+			headVertex = (AbstractJobVertex) this.vertices.get(successor);
+			
+			if (headVertex == null) {
+				throw new CompilerException(
+					"Bug: Trying to merge solution set with its sucessor, but successor has not been created.");
+			}
+			
+			// reset the vertex type to iteration head
+			headVertex.setInvokableClass(IterationHeadPactTask.class);
+			headConfig = new TaskConfig(headVertex.getConfiguration());
+			toReturn = null;
+		} else {
+			// instantiate the head vertex and give it a no-op driver as the driver strategy.
+			// everything else happens in the post visit, after the input (the initial partial solution)
+			// is connected.
+			headVertex = new AbstractJobVertex("PartialSolution ("+iteration.getNodeName()+")");
+			headVertex.setInvokableClass(IterationHeadPactTask.class);
+			headConfig = new TaskConfig(headVertex.getConfiguration());
+			headConfig.setDriver(NoOpDriver.class);
+			toReturn = headVertex;
+		}
+		
+		// create the iteration descriptor and the iteration to it
+		IterationDescriptor descr = this.iterations.get(iteration);
+		if (descr == null) {
+			throw new CompilerException("Bug: Iteration descriptor was not created at when translating the iteration node.");
+		}
+		descr.setHeadTask(headVertex, headConfig);
+		
+		return toReturn;
+	}
+	
+	private AbstractJobVertex createWorksetIterationHead(WorksetPlanNode wspn) {
+		// get the bulk iteration that corresponds to this partial solution node
+		final WorksetIterationPlanNode iteration = wspn.getContainingIterationNode();
+		
+		// check whether we need an individual vertex for the partial solution, or whether we
+		// attach ourselves to the vertex of the parent node. We can combine the head with a node of 
+		// the step function, if
+		// 1) There is one parent that the partial solution connects to via a forward pattern and no
+		//    local strategy
+		// 2) DOP and the number of subtasks per instance does not change
+		// 3) That successor is not a union
+		// 4) That successor is not itself the last node of the step function
+		// 5) There is no local strategy on the edge for the initial workset, as
+		//    this translates to a local strategy that would only be executed in the first superstep
+		
+		final boolean merge;
+		if (mergeIterationAuxTasks && wspn.getOutgoingChannels().size() == 1) {
+			final Channel c = wspn.getOutgoingChannels().get(0);
+			final PlanNode successor = c.getTarget();
+			merge = c.getShipStrategy() == ShipStrategyType.FORWARD &&
+					c.getLocalStrategy() == LocalStrategy.NONE &&
+					c.getTempMode() == TempMode.NONE &&
+					successor.getParallelism() == wspn.getParallelism() &&
+					!(successor instanceof NAryUnionPlanNode) &&
+					successor != iteration.getNextWorkSetPlanNode() &&
+					iteration.getInitialWorksetInput().getLocalStrategy() == LocalStrategy.NONE;
+		} else {
+			merge = false;
+		}
+		
+		// create or adopt the head vertex
+		final AbstractJobVertex toReturn;
+		final AbstractJobVertex headVertex;
+		final TaskConfig headConfig;
+		if (merge) {
+			final PlanNode successor = wspn.getOutgoingChannels().get(0).getTarget();
+			headVertex = (AbstractJobVertex) this.vertices.get(successor);
+			
+			if (headVertex == null) {
+				throw new CompilerException(
+					"Bug: Trying to merge solution set with its sucessor, but successor has not been created.");
+			}
+			
+			// reset the vertex type to iteration head
+			headVertex.setInvokableClass(IterationHeadPactTask.class);
+			headConfig = new TaskConfig(headVertex.getConfiguration());
+			toReturn = null;
+		} else {
+			// instantiate the head vertex and give it a no-op driver as the driver strategy.
+			// everything else happens in the post visit, after the input (the initial partial solution)
+			// is connected.
+			headVertex = new AbstractJobVertex("IterationHead("+iteration.getNodeName()+")");
+			headVertex.setInvokableClass(IterationHeadPactTask.class);
+			headConfig = new TaskConfig(headVertex.getConfiguration());
+			headConfig.setDriver(NoOpDriver.class);
+			toReturn = headVertex;
+		}
+		
+		headConfig.setSolutionSetUnmanaged(iteration.getIterationNode().getIterationContract().isSolutionSetUnManaged());
+		
+		// create the iteration descriptor and the iteration to it
+		IterationDescriptor descr = this.iterations.get(iteration);
+		if (descr == null) {
+			throw new CompilerException("Bug: Iteration descriptor was not created at when translating the iteration node.");
+		}
+		descr.setHeadTask(headVertex, headConfig);
+		
+		return toReturn;
+	}
+	
+	private void assignDriverResources(PlanNode node, TaskConfig config) {
+		final double relativeMem = node.getRelativeMemoryPerSubTask();
+		if (relativeMem > 0) {
+			config.setRelativeMemoryDriver(relativeMem);
+			config.setFilehandlesDriver(this.defaultMaxFan);
+			config.setSpillingThresholdDriver(this.defaultSortSpillingThreshold);
+		}
+	}
+	
+	private void assignLocalStrategyResources(Channel c, TaskConfig config, int inputNum) {
+		if (c.getRelativeMemoryLocalStrategy() > 0) {
+			config.setRelativeMemoryInput(inputNum, c.getRelativeMemoryLocalStrategy());
+			config.setFilehandlesInput(inputNum, this.defaultMaxFan);
+			config.setSpillingThresholdInput(inputNum, this.defaultSortSpillingThreshold);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	// Connecting Vertices
+	// ------------------------------------------------------------------------
+
+	/**
+	 * NOTE: The channel for global and local strategies are different if we connect a union. The global strategy
+	 * channel is then the channel into the union node, the local strategy channel the one from the union to the
+	 * actual target operator.
+	 *
+	 * @param channel
+	 * @param inputNumber
+	 * @param sourceVertex
+	 * @param sourceConfig
+	 * @param targetVertex
+	 * @param targetConfig
+	 * @param isBroadcast
+	 * @throws CompilerException
+	 */
+	private DistributionPattern connectJobVertices(Channel channel, int inputNumber,
+			final AbstractJobVertex sourceVertex, final TaskConfig sourceConfig,
+			final AbstractJobVertex targetVertex, final TaskConfig targetConfig, boolean isBroadcast)
+	throws CompilerException
+	{
+		// ------------ connect the vertices to the job graph --------------
+		final DistributionPattern distributionPattern;
+
+		switch (channel.getShipStrategy()) {
+			case FORWARD:
+				distributionPattern = DistributionPattern.POINTWISE;
+				break;
+			case PARTITION_RANDOM:
+			case BROADCAST:
+			case PARTITION_HASH:
+			case PARTITION_CUSTOM:
+			case PARTITION_RANGE:
+			case PARTITION_FORCED_REBALANCE:
+				distributionPattern = DistributionPattern.ALL_TO_ALL;
+				break;
+			default:
+				throw new RuntimeException("Unknown runtime ship strategy: " + channel.getShipStrategy());
+		}
+
+		final ResultPartitionType resultType;
+
+		switch (channel.getDataExchangeMode()) {
+
+			case PIPELINED:
+				resultType = ResultPartitionType.PIPELINED;
+				break;
+
+			case BATCH:
+				// BLOCKING results are currently not supported in closed loop iterations
+				//
+				// See https://issues.apache.org/jira/browse/FLINK-1713 for details
+				resultType = channel.getSource().isOnDynamicPath()
+						? ResultPartitionType.PIPELINED
+						: ResultPartitionType.BLOCKING;
+				break;
+
+			case PIPELINE_WITH_BATCH_FALLBACK:
+				throw new UnsupportedOperationException("Data exchange mode " +
+						channel.getDataExchangeMode() + " currently not supported.");
+
+			default:
+				throw new UnsupportedOperationException("Unknown data exchange mode.");
+
+		}
+
+		targetVertex.connectNewDataSetAsInput(sourceVertex, distributionPattern, resultType);
+
+		// -------------- configure the source task's ship strategy strategies in task config --------------
+		final int outputIndex = sourceConfig.getNumOutputs();
+		sourceConfig.addOutputShipStrategy(channel.getShipStrategy());
+		if (outputIndex == 0) {
+			sourceConfig.setOutputSerializer(channel.getSerializer());
+		}
+		if (channel.getShipStrategyComparator() != null) {
+			sourceConfig.setOutputComparator(channel.getShipStrategyComparator(), outputIndex);
+		}
+		
+		if (channel.getShipStrategy() == ShipStrategyType.PARTITION_RANGE) {
+			
+			final DataDistribution dataDistribution = channel.getDataDistribution();
+			if (dataDistribution != null) {
+				sourceConfig.setOutputDataDistribution(dataDistribution, outputIndex);
+			} else {
+				throw new RuntimeException("Range partitioning requires data distribution");
+				// TODO: inject code and configuration for automatic histogram generation
+			}
+		}
+		
+		if (channel.getShipStrategy() == ShipStrategyType.PARTITION_CUSTOM) {
+			if (channel.getPartitioner() != null) {
+				sourceConfig.setOutputPartitioner(channel.getPartitioner(), outputIndex);
+			} else {
+				throw new CompilerException("The ship strategy was set to custom partitioning, but no partitioner was set.");
+			}
+		}
+		
+		// ---------------- configure the receiver -------------------
+		if (isBroadcast) {
+			targetConfig.addBroadcastInputToGroup(inputNumber);
+		} else {
+			targetConfig.addInputToGroup(inputNumber);
+		}
+		return distributionPattern;
+	}
+	
+	private void addLocalInfoFromChannelToConfig(Channel channel, TaskConfig config, int inputNum, boolean isBroadcastChannel) {
+		// serializer
+		if (isBroadcastChannel) {
+			config.setBroadcastInputSerializer(channel.getSerializer(), inputNum);
+			
+			if (channel.getLocalStrategy() != LocalStrategy.NONE || (channel.getTempMode() != null && channel.getTempMode() != TempMode.NONE)) {
+				throw new CompilerException("Found local strategy or temp mode on a broadcast variable channel.");
+			} else {
+				return;
+			}
+		} else {
+			config.setInputSerializer(channel.getSerializer(), inputNum);
+		}
+		
+		// local strategy
+		if (channel.getLocalStrategy() != LocalStrategy.NONE) {
+			config.setInputLocalStrategy(inputNum, channel.getLocalStrategy());
+			if (channel.getLocalStrategyComparator() != null) {
+				config.setInputComparator(channel.getLocalStrategyComparator(), inputNum);
+			}
+		}
+		
+		assignLocalStrategyResources(channel, config, inputNum);
+		
+		// materialization / caching
+		if (channel.getTempMode() != null) {
+			final TempMode tm = channel.getTempMode();
+
+			boolean needsMemory = false;
+			// Don't add a pipeline breaker if the data exchange is already blocking.
+			if (tm.breaksPipeline() && channel.getDataExchangeMode() != DataExchangeMode.BATCH) {
+				config.setInputAsynchronouslyMaterialized(inputNum, true);
+				needsMemory = true;
+			}
+			if (tm.isCached()) {
+				config.setInputCached(inputNum, true);
+				needsMemory = true;
+			}
+			
+			if (needsMemory) {
+				// sanity check
+				if (tm == null || tm == TempMode.NONE || channel.getRelativeTempMemory() <= 0) {
+					throw new CompilerException("Bug in compiler: Inconsistent description of input materialization.");
+				}
+				config.setRelativeInputMaterializationMemory(inputNum, channel.getRelativeTempMemory());
+			}
+		}
+	}
+	
+	private void finalizeBulkIteration(IterationDescriptor descr) {
+		
+		final BulkIterationPlanNode bulkNode = (BulkIterationPlanNode) descr.getIterationNode();
+		final AbstractJobVertex headVertex = descr.getHeadTask();
+		final TaskConfig headConfig = new TaskConfig(headVertex.getConfiguration());
+		final TaskConfig headFinalOutputConfig = descr.getHeadFinalResultConfig();
+		
+		// ------------ finalize the head config with the final outputs and the sync gate ------------
+		final int numStepFunctionOuts = headConfig.getNumOutputs();
+		final int numFinalOuts = headFinalOutputConfig.getNumOutputs();
+		
+		if (numStepFunctionOuts == 0) {
+			throw new CompilerException("The iteration has no operation inside the step function.");
+		}
+		
+		headConfig.setIterationHeadFinalOutputConfig(headFinalOutputConfig);
+		headConfig.setIterationHeadIndexOfSyncOutput(numStepFunctionOuts + numFinalOuts);
+		final double relativeMemForBackChannel = bulkNode.getRelativeMemoryPerSubTask();
+		if (relativeMemForBackChannel <= 0) {
+			throw new CompilerException("Bug: No memory has been assigned to the iteration back channel.");
+		}
+		headConfig.setRelativeBackChannelMemory(relativeMemForBackChannel);
+		
+		// --------------------------- create the sync task ---------------------------
+		final AbstractJobVertex sync = new AbstractJobVertex("Sync(" + bulkNode.getNodeName() + ")");
+		sync.setInvokableClass(IterationSynchronizationSinkTask.class);
+		sync.setParallelism(1);
+		this.auxVertices.add(sync);
+		
+		final TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
+		syncConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, headVertex.getParallelism());
+
+		// set the number of iteration / convergence criterion for the sync
+		final int maxNumIterations = bulkNode.getIterationNode().getIterationContract().getMaximumNumberOfIterations();
+		if (maxNumIterations < 1) {
+			throw new CompilerException("Cannot create bulk iteration with unspecified maximum number of iterations.");
+		}
+		syncConfig.setNumberOfIterations(maxNumIterations);
+		
+		// connect the sync task
+		sync.connectNewDataSetAsInput(headVertex, DistributionPattern.POINTWISE);
+		
+		// ----------------------------- create the iteration tail ------------------------------
+		
+		final PlanNode rootOfTerminationCriterion = bulkNode.getRootOfTerminationCriterion();
+		final PlanNode rootOfStepFunction = bulkNode.getRootOfStepFunction();
+		final TaskConfig tailConfig;
+		
+		AbstractJobVertex rootOfStepFunctionVertex = (AbstractJobVertex) this.vertices.get(rootOfStepFunction);
+		if (rootOfStepFunctionVertex == null) {
+			// last op is chained
+			final TaskInChain taskInChain = this.chainedTasks.get(rootOfStepFunction);
+			if (taskInChain == null) {
+				throw new CompilerException("Bug: Tail of step function not found as vertex or chained task.");
+			}
+			rootOfStepFunctionVertex = (AbstractJobVertex) taskInChain.getContainingVertex();
+
+			// the fake channel is statically typed to pact record. no data is sent over this channel anyways.
+			tailConfig = taskInChain.getTaskConfig();
+		} else {
+			tailConfig = new TaskConfig(rootOfStepFunctionVertex.getConfiguration());
+		}
+		
+		tailConfig.setIsWorksetUpdate();
+		
+		// No following termination criterion
+		if (rootOfStepFunction.getOutgoingChannels().isEmpty()) {
+			
+			rootOfStepFunctionVertex.setInvokableClass(IterationTailPactTask.class);
+			
+			tailConfig.setOutputSerializer(bulkNode.getSerializerForIterationChannel());
+		}
+		
+		
+		// create the fake output task for termination criterion, if needed
+		final TaskConfig tailConfigOfTerminationCriterion;
+		// If we have a termination criterion and it is not an intermediate node
+		if(rootOfTerminationCriterion != null && rootOfTerminationCriterion.getOutgoingChannels().isEmpty()) {
+			AbstractJobVertex rootOfTerminationCriterionVertex = (AbstractJobVertex) this.vertices.get(rootOfTerminationCriterion);
+			
+			
+			if (rootOfTerminationCriterionVertex == null) {
+				// last op is chained
+				final TaskInChain taskInChain = this.chainedTasks.get(rootOfTerminationCriterion);
+				if (taskInChain == null) {
+					throw new CompilerException("Bug: Tail of termination criterion not found as vertex or chained task.");
+				}
+				rootOfTerminationCriterionVertex = (AbstractJobVertex) taskInChain.getContainingVertex();
+
+				// the fake channel is statically typed to pact record. no data is sent over this channel anyways.
+				tailConfigOfTerminationCriterion = taskInChain.getTaskConfig();
+			} else {
+				tailConfigOfTerminationCriterion = new TaskConfig(rootOfTerminationCriterionVertex.getConfiguration());
+			}
+			
+			rootOfTerminationCriterionVertex.setInvokableClass(IterationTailPactTask.class);
+			// Hack
+			tailConfigOfTerminationCriterion.setIsSolutionSetUpdate();
+			tailConfigOfTerminationCriterion.setOutputSerializer(bulkNode.getSerializerForIterationChannel());
+			
+			// tell the head that it needs to wait for the solution set updates
+			headConfig.setWaitForSolutionSetUpdate();
+		}
+		
+		// ------------------- register the aggregators -------------------
+		AggregatorRegistry aggs = bulkNode.getIterationNode().getIterationContract().getAggregators();
+		Collection<AggregatorWithName<?>> allAggregators = aggs.getAllRegisteredAggregators();
+		
+		headConfig.addIterationAggregators(allAggregators);
+		syncConfig.addIterationAggregators(allAggregators);
+		
+		String convAggName = aggs.getConvergenceCriterionAggregatorName();
+		ConvergenceCriterion<?> convCriterion = aggs.getConvergenceCriterion();
+		
+		if (convCriterion != null || convAggName != null) {
+			if (convCriterion == null) {
+				throw new CompilerException("Error: Convergence criterion aggregator set, but criterion is null.");
+			}
+			if (convAggName == null) {
+				throw new CompilerException("Error: Aggregator convergence criterion set, but aggregator is null.");
+			}
+			
+			syncConfig.setConvergenceCriterion(convAggName, convCriterion);
+		}
+	}
+	
+	private void finalizeWorksetIteration(IterationDescriptor descr) {
+		final WorksetIterationPlanNode iterNode = (WorksetIterationPlanNode) descr.getIterationNode();
+		final AbstractJobVertex headVertex = descr.getHeadTask();
+		final TaskConfig headConfig = new TaskConfig(headVertex.getConfiguration());
+		final TaskConfig headFinalOutputConfig = descr.getHeadFinalResultConfig();
+		
+		// ------------ finalize the head config with the final outputs and the sync gate ------------
+		{
+			final int numStepFunctionOuts = headConfig.getNumOutputs();
+			final int numFinalOuts = headFinalOutputConfig.getNumOutputs();
+			
+			if (numStepFunctionOuts == 0) {
+				throw new CompilerException("The workset iteration has no operation on the workset inside the step function.");
+			}
+			
+			headConfig.setIterationHeadFinalOutputConfig(headFinalOutputConfig);
+			headConfig.setIterationHeadIndexOfSyncOutput(numStepFunctionOuts + numFinalOuts);
+			final double relativeMemory = iterNode.getRelativeMemoryPerSubTask();
+			if (relativeMemory <= 0) {
+				throw new CompilerException("Bug: No memory has been assigned to the workset iteration.");
+			}
+			
+			headConfig.setIsWorksetIteration();
+			headConfig.setRelativeBackChannelMemory(relativeMemory / 2);
+			headConfig.setRelativeSolutionSetMemory(relativeMemory / 2);
+			
+			// set the solution set serializer and comparator
+			headConfig.setSolutionSetSerializer(iterNode.getSolutionSetSerializer());
+			headConfig.setSolutionSetComparator(iterNode.getSolutionSetComparator());
+		}
+		
+		// --------------------------- create the sync task ---------------------------
+		final TaskConfig syncConfig;
+		{
+			final AbstractJobVertex sync = new AbstractJobVertex("Sync (" + iterNode.getNodeName() + ")");
+			sync.setInvokableClass(IterationSynchronizationSinkTask.class);
+			sync.setParallelism(1);
+			this.auxVertices.add(sync);
+			
+			syncConfig = new TaskConfig(sync.getConfiguration());
+			syncConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, headVertex.getParallelism());
+	
+			// set the number of iteration / convergence criterion for the sync
+			final int maxNumIterations = iterNode.getIterationNode().getIterationContract().getMaximumNumberOfIterations();
+			if (maxNumIterations < 1) {
+				throw new CompilerException("Cannot create workset iteration with unspecified maximum number of iterations.");
+			}
+			syncConfig.setNumberOfIterations(maxNumIterations);
+			
+			// connect the sync task
+			sync.connectNewDataSetAsInput(headVertex, DistributionPattern.POINTWISE);
+		}
+		
+		// ----------------------------- create the iteration tails -----------------------------
+		// ----------------------- for next workset and solution set delta-----------------------
+
+		{
+			// we have three possible cases:
+			// 1) Two tails, one for workset update, one for solution set update
+			// 2) One tail for workset update, solution set update happens in an intermediate task
+			// 3) One tail for solution set update, workset update happens in an intermediate task
+			
+			final PlanNode nextWorksetNode = iterNode.getNextWorkSetPlanNode();
+			final PlanNode solutionDeltaNode = iterNode.getSolutionSetDeltaPlanNode();
+			
+			final boolean hasWorksetTail = nextWorksetNode.getOutgoingChannels().isEmpty();
+			final boolean hasSolutionSetTail = (!iterNode.isImmediateSolutionSetUpdate()) || (!hasWorksetTail);
+			
+			{
+				// get the vertex for the workset update
+				final TaskConfig worksetTailConfig;
+				AbstractJobVertex nextWorksetVertex = (AbstractJobVertex) this.vertices.get(nextWorksetNode);
+				if (nextWorksetVertex == null) {
+					// nextWorksetVertex is chained
+					TaskInChain taskInChain = this.chainedTasks.get(nextWorksetNode);
+					if (taskInChain == null) {
+						throw new CompilerException("Bug: Next workset node not found as vertex or chained task.");
+					}
+					nextWorksetVertex = (AbstractJobVertex) taskInChain.getContainingVertex();
+					worksetTailConfig = taskInChain.getTaskConfig();
+				} else {
+					worksetTailConfig = new TaskConfig(nextWorksetVertex.getConfiguration());
+				}
+				
+				// mark the node to perform workset updates
+				worksetTailConfig.setIsWorksetIteration();
+				worksetTailConfig.setIsWorksetUpdate();
+				
+				if (hasWorksetTail) {
+					nextWorksetVertex.setInvokableClass(IterationTailPactTask.class);
+					
+					worksetTailConfig.setOutputSerializer(iterNode.getWorksetSerializer());
+				}
+			}
+			{
+				final TaskConfig solutionDeltaConfig;
+				AbstractJobVertex solutionDeltaVertex = (AbstractJobVertex) this.vertices.get(solutionDeltaNode);
+				if (solutionDeltaVertex == null) {
+					// last op is chained
+					TaskInChain taskInChain = this.chainedTasks.get(solutionDeltaNode);
+					if (taskInChain == null) {
+						throw new CompilerException("Bug: Solution Set Delta not found as vertex or chained task.");
+					}
+					solutionDeltaVertex = (AbstractJobVertex) taskInChain.getContainingVertex();
+					solutionDeltaConfig = taskInChain.getTaskConfig();
+				} else {
+					solutionDeltaConfig = new TaskConfig(solutionDeltaVertex.getConfiguration());
+				}
+				
+				solutionDeltaConfig.setIsWorksetIteration();
+				solutionDeltaConfig.setIsSolutionSetUpdate();
+				
+				if (hasSolutionSetTail) {
+					solutionDeltaVertex.setInvokableClass(IterationTailPactTask.class);
+					
+					solutionDeltaConfig.setOutputSerializer(iterNode.getSolutionSetSerializer());
+					
+					// tell the head that it needs to wait for the solution set updates
+					headConfig.setWaitForSolutionSetUpdate();
+				}
+				else {
+					// no tail, intermediate update. must be immediate update
+					if (!iterNode.isImmediateSolutionSetUpdate()) {
+						throw new CompilerException("A solution set update without dedicated tail is not set to perform immediate updates.");
+					}
+					solutionDeltaConfig.setIsSolutionSetUpdateWithoutReprobe();
+				}
+			}
+		}
+		
+		// ------------------- register the aggregators -------------------
+		AggregatorRegistry aggs = iterNode.getIterationNode().getIterationContract().getAggregators();
+		Collection<AggregatorWithName<?>> allAggregators = aggs.getAllRegisteredAggregators();
+		
+		for (AggregatorWithName<?> agg : allAggregators) {
+			if (agg.getName().equals(WorksetEmptyConvergenceCriterion.AGGREGATOR_NAME)) {
+				throw new CompilerException("User defined aggregator used the same name as built-in workset " +
+						"termination check aggregator: " + WorksetEmptyConvergenceCriterion.AGGREGATOR_NAME);
+			}
+		}
+		
+		headConfig.addIterationAggregators(allAggregators);
+		syncConfig.addIterationAggregators(allAggregators);
+		
+		String convAggName = aggs.getConvergenceCriterionAggregatorName();
+		ConvergenceCriterion<?> convCriterion = aggs.getConvergenceCriterion();
+		
+		if (convCriterion != null || convAggName != null) {
+			throw new CompilerException("Error: Cannot use custom convergence criterion with workset iteration. Workset iterations have implicit convergence criterion where workset is empty.");
+		}
+		
+		headConfig.addIterationAggregator(WorksetEmptyConvergenceCriterion.AGGREGATOR_NAME, new LongSumAggregator());
+		syncConfig.addIterationAggregator(WorksetEmptyConvergenceCriterion.AGGREGATOR_NAME, new LongSumAggregator());
+		syncConfig.setConvergenceCriterion(WorksetEmptyConvergenceCriterion.AGGREGATOR_NAME, new WorksetEmptyConvergenceCriterion());
+	}
+	
+	private static String getDescriptionForUserCode(UserCodeWrapper<?> wrapper) {
+		try {
+			if (wrapper.hasObject()) {
+				try {
+					return wrapper.getUserCodeObject().toString();
+				}
+				catch (Throwable t) {
+					return wrapper.getUserCodeClass().getName();
+				}
+			}
+			else {
+				return wrapper.getUserCodeClass().getName();
+			}
+		}
+		catch (Throwable t) {
+			return null;
+		}
+	}
+
+	// -------------------------------------------------------------------------------------
+	// Descriptors for tasks / configurations that are chained or merged with other tasks
+	// -------------------------------------------------------------------------------------
+	
+	/**
+	 * Utility class that describes a task in a sequence of chained tasks. Chained tasks are tasks that run
+	 * together in one thread.
+	 */
+	private static final class TaskInChain {
+		
+		private final Class<? extends ChainedDriver<?, ?>> chainedTask;
+		
+		private final TaskConfig taskConfig;
+		
+		private final String taskName;
+		
+		private AbstractJobVertex containingVertex;
+
+		TaskInChain(Class<? extends ChainedDriver<?, ?>> chainedTask, TaskConfig taskConfig,
+					String taskName) {
+			this.chainedTask = chainedTask;
+			this.taskConfig = taskConfig;
+			this.taskName = taskName;
+		}
+		
+		public Class<? extends ChainedDriver<?, ?>> getChainedTask() {
+			return this.chainedTask;
+		}
+		
+		public TaskConfig getTaskConfig() {
+			return this.taskConfig;
+		}
+		
+		public String getTaskName() {
+			return this.taskName;
+		}
+		
+		public AbstractJobVertex getContainingVertex() {
+			return this.containingVertex;
+		}
+		
+		public void setContainingVertex(AbstractJobVertex containingVertex) {
+			this.containingVertex = containingVertex;
+		}
+	}
+	
+	private static final class IterationDescriptor {
+		
+		private final IterationPlanNode iterationNode;
+		
+		private AbstractJobVertex headTask;
+		
+		private TaskConfig headConfig;
+		
+		private TaskConfig  headFinalResultConfig;
+		
+		private final int id;
+
+		public IterationDescriptor(IterationPlanNode iterationNode, int id) {
+			this.iterationNode = iterationNode;
+			this.id = id;
+		}
+		
+		public IterationPlanNode getIterationNode() {
+			return iterationNode;
+		}
+		
+		public void setHeadTask(AbstractJobVertex headTask, TaskConfig headConfig) {
+			this.headTask = headTask;
+			this.headFinalResultConfig = new TaskConfig(new Configuration());
+			
+			// check if we already had a configuration, for example if the solution set was 
+			if (this.headConfig != null) {
+				headConfig.getConfiguration().addAll(this.headConfig.getConfiguration());
+			}
+			
+			this.headConfig = headConfig;
+		}
+		
+		public AbstractJobVertex getHeadTask() {
+			return headTask;
+		}
+		
+		public TaskConfig getHeadFinalResultConfig() {
+			return headFinalResultConfig;
+		}
+		
+		public int getId() {
+			return this.id;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/AbstractSchema.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/AbstractSchema.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/AbstractSchema.java
new file mode 100644
index 0000000..f2b736c
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/AbstractSchema.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.postpass;
+
+import java.util.Map;
+
+public abstract class AbstractSchema<X> implements Iterable<Map.Entry<Integer, X>> {
+
+	private int numConnectionsThatContributed;
+	
+	
+	public int getNumConnectionsThatContributed() {
+		return this.numConnectionsThatContributed;
+	}
+	
+	public void increaseNumConnectionsThatContributed() {
+		this.numConnectionsThatContributed++;
+	}
+	
+	public abstract void addType(int pos, X type) throws ConflictingFieldTypeInfoException;
+	
+	public abstract X getType(int field);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/ConflictingFieldTypeInfoException.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/ConflictingFieldTypeInfoException.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/ConflictingFieldTypeInfoException.java
new file mode 100644
index 0000000..56d914c
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/ConflictingFieldTypeInfoException.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.postpass;
+
+public final class ConflictingFieldTypeInfoException extends Exception {
+
+	private static final long serialVersionUID = 3991352502693288321L;
+
+	private final int fieldNumber;
+	
+	private final Object previousType, newType;
+
+	
+	public ConflictingFieldTypeInfoException(int fieldNumber, Object previousType, Object newType) {
+		super("Conflicting type info for field " + fieldNumber + ": Old='" + previousType + "', new='" + newType + "'.");
+		this.fieldNumber = fieldNumber;
+		this.previousType = previousType;
+		this.newType = newType;
+	}
+	
+	
+	public int getFieldNumber() {
+		return fieldNumber;
+	}
+
+	public Object getPreviousType() {
+		return this.previousType;
+	}
+
+	public Object getNewType() {
+		return this.newType;
+	}
+}