You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/20 11:07:32 UTC

[53/53] [abbrv] flink git commit: [FLINK-441] [optimizer] Rename Pact* and Nephele* classes

[FLINK-441] [optimizer] Rename Pact* and Nephele* classes

Also clean up and improve various comments and method names.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a9150b30
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a9150b30
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a9150b30

Branch: refs/heads/master
Commit: a9150b30f45cf2280130503667e5615b9471ee0a
Parents: 9d222ca
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Mar 17 11:24:35 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Mar 20 10:21:14 2015 +0100

----------------------------------------------------------------------
 .../org/apache/flink/client/LocalExecutor.java  |   14 +-
 .../org/apache/flink/client/program/Client.java |   16 +-
 .../flink/client/program/PackagedProgram.java   |    8 +-
 .../apache/flink/client/program/ClientTest.java |   16 +-
 .../flink/optimizer/CompilerException.java      |    9 +-
 .../optimizer/CompilerPostPassException.java    |    8 +-
 .../apache/flink/optimizer/DataStatistics.java  |    7 +-
 .../org/apache/flink/optimizer/Optimizer.java   | 1372 +++++++++++++++
 .../apache/flink/optimizer/PactCompiler.java    | 1372 ---------------
 .../dag/AbstractPartialSolutionNode.java        |    5 +-
 .../flink/optimizer/dag/BinaryUnionNode.java    |    8 +-
 .../flink/optimizer/dag/BulkIterationNode.java  |   26 +-
 .../optimizer/dag/BulkPartialSolutionNode.java  |   13 +-
 .../apache/flink/optimizer/dag/CoGroupNode.java |    6 +-
 .../apache/flink/optimizer/dag/CrossNode.java   |   16 +-
 .../flink/optimizer/dag/DagConnection.java      |  290 ++++
 .../flink/optimizer/dag/DataSinkNode.java       |   32 +-
 .../flink/optimizer/dag/DataSourceNode.java     |   36 +-
 .../flink/optimizer/dag/EstimateProvider.java   |    6 +-
 .../apache/flink/optimizer/dag/FilterNode.java  |    4 +-
 .../apache/flink/optimizer/dag/FlatMapNode.java |    4 +-
 .../flink/optimizer/dag/GroupCombineNode.java   |    6 +-
 .../flink/optimizer/dag/GroupReduceNode.java    |   22 +-
 .../apache/flink/optimizer/dag/JoinNode.java    |   20 +-
 .../apache/flink/optimizer/dag/MatchNode.java   |   20 +-
 .../flink/optimizer/dag/OptimizerNode.java      |  216 +--
 .../flink/optimizer/dag/PactConnection.java     |  290 ----
 .../flink/optimizer/dag/PartitionNode.java      |    6 +-
 .../apache/flink/optimizer/dag/ReduceNode.java  |    4 +-
 .../flink/optimizer/dag/SingleInputNode.java    |   48 +-
 .../apache/flink/optimizer/dag/SinkJoiner.java  |   11 +-
 .../flink/optimizer/dag/SolutionSetNode.java    |    8 +-
 .../flink/optimizer/dag/SortPartitionNode.java  |    4 +-
 .../flink/optimizer/dag/TwoInputNode.java       |   86 +-
 .../optimizer/dag/WorksetIterationNode.java     |   26 +-
 .../apache/flink/optimizer/dag/WorksetNode.java |    8 +-
 .../dataproperties/GlobalProperties.java        |    6 +-
 .../dataproperties/InterestingProperties.java   |   19 +-
 .../dataproperties/LocalProperties.java         |    2 +-
 .../dataproperties/PartitioningProperty.java    |   23 +-
 .../RequestedLocalProperties.java               |   18 +-
 .../operators/AllGroupCombineProperties.java    |    2 +-
 .../operators/AllGroupReduceProperties.java     |    2 +-
 .../AllGroupWithPartialPreGroupProperties.java  |    8 +-
 .../operators/AllReduceProperties.java          |    8 +-
 .../operators/CartesianProductDescriptor.java   |    2 +-
 .../optimizer/operators/CoGroupDescriptor.java  |    2 +-
 .../CoGroupWithSolutionSetFirstDescriptor.java  |    2 +-
 .../operators/CollectorMapDescriptor.java       |    2 +-
 .../optimizer/operators/FilterDescriptor.java   |    2 +-
 .../optimizer/operators/FlatMapDescriptor.java  |    2 +-
 .../operators/GroupCombineProperties.java       |    4 +-
 .../operators/GroupReduceProperties.java        |    2 +-
 .../GroupReduceWithCombineProperties.java       |    8 +-
 .../operators/HashJoinBuildFirstProperties.java |    2 +-
 .../HashJoinBuildSecondProperties.java          |    2 +-
 .../optimizer/operators/MapDescriptor.java      |    2 +-
 .../operators/MapPartitionDescriptor.java       |    2 +-
 .../operators/PartialGroupProperties.java       |    6 +-
 .../optimizer/operators/ReduceProperties.java   |    8 +-
 .../operators/SortMergeJoinDescriptor.java      |    2 +-
 .../optimizer/plan/BinaryUnionPlanNode.java     |    2 +-
 .../apache/flink/optimizer/plan/Channel.java    |   25 +-
 .../flink/optimizer/plan/DualInputPlanNode.java |    4 +-
 .../apache/flink/optimizer/plan/PlanNode.java   |   29 +-
 .../optimizer/plan/SingleInputPlanNode.java     |    2 +-
 .../plandump/PlanJSONDumpGenerator.java         |   24 +-
 .../plantranslate/JobGraphGenerator.java        | 1578 +++++++++++++++++
 .../plantranslate/NepheleJobGraphGenerator.java | 1581 ------------------
 .../postpass/GenericFlatTypePostPass.java       |   48 +-
 .../optimizer/postpass/JavaApiPostPass.java     |   18 +-
 .../optimizer/postpass/RecordModelPostPass.java |    6 +-
 .../optimizer/BranchingPlansCompilerTest.java   |   24 +-
 .../CachedMatchStrategyCompilerTest.java        |   26 +-
 .../flink/optimizer/CompilerTestBase.java       |   12 +-
 .../apache/flink/optimizer/DOPChangeTest.java   |    4 +-
 .../flink/optimizer/DisjointDataFlowsTest.java  |    4 +-
 .../optimizer/DistinctCompilationTest.java      |   28 +-
 .../optimizer/HardPlansCompilationTest.java     |    4 +-
 .../flink/optimizer/IterationsCompilerTest.java |   14 +-
 .../flink/optimizer/NestedIterationsTest.java   |    6 +-
 .../flink/optimizer/PipelineBreakerTest.java    |    8 +-
 .../apache/flink/optimizer/ReduceAllTest.java   |    4 +-
 .../SemanticPropertiesAPIToPlanTest.java        |    6 +-
 .../UnionBetweenDynamicAndStaticPathTest.java   |    6 +-
 .../optimizer/UnionPropertyPropagationTest.java |   12 +-
 .../flink/optimizer/UnionReplacementTest.java   |    4 +-
 .../WorksetIterationCornerCasesTest.java        |    4 +-
 .../WorksetIterationsRecordApiCompilerTest.java |    8 +-
 ...naryCustomPartitioningCompatibilityTest.java |    6 +-
 .../custompartition/CustomPartitioningTest.java |   28 +-
 .../DataExchangeModeClosedBranchingTest.java    |    2 +-
 .../DataExchangeModeOpenBranchingTest.java      |    2 +-
 .../dataexchange/PipelineBreakingTest.java      |   10 +-
 .../java/GroupReduceCompilationTest.java        |   50 +-
 .../optimizer/java/IterationCompilerTest.java   |   10 +-
 .../optimizer/java/ReduceCompilationTest.java   |   34 +-
 .../WorksetIterationsJavaApiCompilerTest.java   |    8 +-
 .../CoGroupOnConflictingPartitioningsTest.java  |    6 +-
 .../JoinOnConflictingPartitioningsTest.java     |    6 +-
 .../flink/spargel/java/SpargelCompilerTest.java |   16 +-
 .../test/compiler/util/CompilerTestBase.java    |   14 +-
 .../flink/test/util/RecordAPITestBase.java      |    8 +-
 .../apache/flink/test/util/TestEnvironment.java |    8 +-
 .../test/cancelling/CancellingTestBase.java     |    8 +-
 .../ConnectedComponentsCoGroupTest.java         |    4 +-
 .../iterations/ConnectedComponentsTest.java     |    6 +-
 .../iterations/IterativeKMeansTest.java         |    6 +-
 ...ultipleJoinsWithSolutionSetCompilerTest.java |    4 +-
 .../iterations/PageRankCompilerTest.java        |    4 +-
 .../compiler/plandump/PreviewPlanDumpTest.java  |    4 +-
 .../exampleJavaPrograms/WordCountITCase.java    |    5 -
 .../test/failingPrograms/TaskFailureITCase.java |   12 +-
 .../hadoop/mapred/WordCountMapredITCase.java    |    7 -
 .../mapreduce/WordCountMapreduceITCase.java     |    6 -
 .../javaApiOperators/GroupReduceITCase.java     |    4 +-
 .../flink/test/operators/CoGroupITCase.java     |   10 +-
 .../flink/test/operators/CrossITCase.java       |   30 +-
 .../apache/flink/test/operators/JoinITCase.java |   28 +-
 .../flink/test/operators/ReduceITCase.java      |   16 +-
 .../flink/test/operators/UnionSinkITCase.java   |    8 +-
 .../api/scala/operators/GroupReduceITCase.scala |    4 +-
 .../translation/CustomPartitioningTest.scala    |   28 +-
 123 files changed, 4042 insertions(+), 4032 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
index a2aed8f..6df0f79 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
@@ -33,11 +33,11 @@ import org.apache.flink.runtime.client.JobClient;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.PactCompiler;
+import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.dag.DataSinkNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 
 /**
@@ -158,10 +158,10 @@ public class LocalExecutor extends PlanExecutor {
 			}
 
 			try {
-				PactCompiler pc = new PactCompiler(new DataStatistics());
+				Optimizer pc = new Optimizer(new DataStatistics());
 				OptimizedPlan op = pc.compile(plan);
 				
-				NepheleJobGraphGenerator jgg = new NepheleJobGraphGenerator();
+				JobGraphGenerator jgg = new JobGraphGenerator();
 				JobGraph jobGraph = jgg.compileJobGraph(op);
 
 				ActorRef jobClient = flink.getJobClient();
@@ -186,7 +186,7 @@ public class LocalExecutor extends PlanExecutor {
 	 * @throws Exception
 	 */
 	public String getOptimizerPlanAsJSON(Plan plan) throws Exception {
-		PactCompiler pc = new PactCompiler(new DataStatistics());
+		Optimizer pc = new Optimizer(new DataStatistics());
 		OptimizedPlan op = pc.compile(plan);
 		PlanJSONDumpGenerator gen = new PlanJSONDumpGenerator();
 	
@@ -242,7 +242,7 @@ public class LocalExecutor extends PlanExecutor {
 		LocalExecutor exec = new LocalExecutor();
 		try {
 			exec.start();
-			PactCompiler pc = new PactCompiler(new DataStatistics());
+			Optimizer pc = new Optimizer(new DataStatistics());
 			OptimizedPlan op = pc.compile(plan);
 			PlanJSONDumpGenerator gen = new PlanJSONDumpGenerator();
 
@@ -260,7 +260,7 @@ public class LocalExecutor extends PlanExecutor {
 	 */
 	public static String getPlanAsJSON(Plan plan) {
 		PlanJSONDumpGenerator gen = new PlanJSONDumpGenerator();
-		List<DataSinkNode> sinks = PactCompiler.createPreOptimizedPlan(plan);
+		List<DataSinkNode> sinks = Optimizer.createPreOptimizedPlan(plan);
 		return gen.getPactPlanAsJSON(sinks);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
index 84e6637..4e593c8 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
@@ -32,13 +32,13 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.ExecutionEnvironmentFactory;
 import org.apache.flink.optimizer.CompilerException;
 import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.PactCompiler;
+import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.costs.DefaultCostEstimator;
 import org.apache.flink.optimizer.plan.FlinkPlan;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.StreamingPlan;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
@@ -66,7 +66,7 @@ public class Client {
 	
 	private final Configuration configuration;	// the configuration describing the job manager address
 	
-	private final PactCompiler compiler;		// the compiler to compile the jobs
+	private final Optimizer compiler;		// the compiler to compile the jobs
 	
 	private boolean printStatusDuringExecution = false;
 	
@@ -88,7 +88,7 @@ public class Client {
 		configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, jobManagerAddress.getAddress().getHostAddress());
 		configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerAddress.getPort());
 		
-		this.compiler = new PactCompiler(new DataStatistics(), new DefaultCostEstimator());
+		this.compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator());
 	}
 
 	/**
@@ -112,7 +112,7 @@ public class Client {
 			throw new CompilerException("Cannot find port to job manager's RPC service in the global configuration.");
 		}
 
-		this.compiler = new PactCompiler(new DataStatistics(), new DefaultCostEstimator());
+		this.compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator());
 	}
 	
 	public void setPrintStatusDuringExecution(boolean print) {
@@ -219,7 +219,7 @@ public class Client {
 		if (optPlan instanceof StreamingPlan) {
 			job = ((StreamingPlan) optPlan).getJobGraph();
 		} else {
-			NepheleJobGraphGenerator gen = new NepheleJobGraphGenerator();
+			JobGraphGenerator gen = new JobGraphGenerator();
 			job = gen.compileJobGraph((OptimizedPlan) optPlan);
 		}
 
@@ -355,12 +355,12 @@ public class Client {
 	
 	public static final class OptimizerPlanEnvironment extends ExecutionEnvironment {
 		
-		private final PactCompiler compiler;
+		private final Optimizer compiler;
 		
 		private FlinkPlan optimizerPlan;
 		
 		
-		private OptimizerPlanEnvironment(PactCompiler compiler) {
+		private OptimizerPlanEnvironment(Optimizer compiler) {
 			this.compiler = compiler;
 		}
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
index d79d404..10096da 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
@@ -46,7 +46,7 @@ import org.apache.flink.api.common.Program;
 import org.apache.flink.api.common.ProgramDescription;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.ExecutionEnvironmentFactory;
-import org.apache.flink.optimizer.PactCompiler;
+import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.dag.DataSinkNode;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
 import org.apache.flink.util.InstantiationUtil;
@@ -255,7 +255,7 @@ public class PackagedProgram {
 		List<DataSinkNode> previewPlan;
 		
 		if (isUsingProgramEntryPoint()) {
-			previewPlan = PactCompiler.createPreOptimizedPlan(getPlan());
+			previewPlan = Optimizer.createPreOptimizedPlan(getPlan());
 		}
 		else if (isUsingInteractiveMode()) {
 			// temporary hack to support the web client
@@ -698,7 +698,7 @@ public class PackagedProgram {
 		@Override
 		public JobExecutionResult execute(String jobName) throws Exception {
 			this.plan = createProgramPlan(jobName);
-			this.previewPlan = PactCompiler.createPreOptimizedPlan((Plan) plan);
+			this.previewPlan = Optimizer.createPreOptimizedPlan((Plan) plan);
 			
 			// do not go on with anything now!
 			throw new Client.ProgramAbortException();
@@ -707,7 +707,7 @@ public class PackagedProgram {
 		@Override
 		public String getExecutionPlan() throws Exception {
 			Plan plan = createProgramPlan("unused");
-			this.previewPlan = PactCompiler.createPreOptimizedPlan(plan);
+			this.previewPlan = Optimizer.createPreOptimizedPlan(plan);
 			
 			// do not go on with anything now!
 			throw new Client.ProgramAbortException();

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index bac5c04..9278c7a 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -27,10 +27,10 @@ import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.PactCompiler;
+import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.costs.CostEstimator;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -69,8 +69,8 @@ import static org.powermock.api.mockito.PowerMockito.whenNew;
 public class ClientTest {
 
 	private PackagedProgram program;
-	private PactCompiler compilerMock;
-	private NepheleJobGraphGenerator generatorMock;
+	private Optimizer compilerMock;
+	private JobGraphGenerator generatorMock;
 
 
 	private Configuration config;
@@ -89,8 +89,8 @@ public class ClientTest {
 		config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT);
 
 		program = mock(PackagedProgram.class);
-		compilerMock = mock(PactCompiler.class);
-		generatorMock = mock(NepheleJobGraphGenerator.class);
+		compilerMock = mock(Optimizer.class);
+		generatorMock = mock(JobGraphGenerator.class);
 
 		JobWithJars planWithJarsMock = mock(JobWithJars.class);
 		Plan planMock = mock(Plan.class);
@@ -101,10 +101,10 @@ public class ClientTest {
 		when(program.getPlanWithJars()).thenReturn(planWithJarsMock);
 		when(planWithJarsMock.getPlan()).thenReturn(planMock);
 
-		whenNew(PactCompiler.class).withArguments(any(DataStatistics.class), any(CostEstimator.class)).thenReturn(this.compilerMock);
+		whenNew(Optimizer.class).withArguments(any(DataStatistics.class), any(CostEstimator.class)).thenReturn(this.compilerMock);
 		when(compilerMock.compile(planMock)).thenReturn(optimizedPlanMock);
 
-		whenNew(NepheleJobGraphGenerator.class).withNoArguments().thenReturn(generatorMock);
+		whenNew(JobGraphGenerator.class).withNoArguments().thenReturn(generatorMock);
 		when(generatorMock.compileJobGraph(optimizedPlanMock)).thenReturn(jobGraph);
 
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/CompilerException.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/CompilerException.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/CompilerException.java
index 999c687..2f99ddb 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/CompilerException.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/CompilerException.java
@@ -16,24 +16,19 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.optimizer;
 
 /**
- * An exception that is thrown by the pact compiler when encountering an illegal condition.
+ * An exception that is thrown by the Optimizer when encountering an illegal condition.
  */
 public class CompilerException extends RuntimeException {
 
-	/**
-	 * Serial version UID for serialization interoperability.
-	 */
 	private static final long serialVersionUID = 3810067304570563755L;
 
 	/**
 	 * Creates a compiler exception with no message and no cause.
 	 */
-	public CompilerException() {
-	}
+	public CompilerException() {}
 
 	/**
 	 * Creates a compiler exception with the given message and no cause.

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/CompilerPostPassException.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/CompilerPostPassException.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/CompilerPostPassException.java
index 1c58217..78e47a0 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/CompilerPostPassException.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/CompilerPostPassException.java
@@ -16,19 +16,15 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.optimizer;
 
 /**
- * An exception that is thrown by the pact compiler when encountering
+ * An exception that is thrown by the Optimizer when encountering
  * a problem during the optimizer post pass. This is a dedicated exception
  * because it is thrown by user-specified optimizer extensions.
  */
 public class CompilerPostPassException extends CompilerException {
-	
-	/**
-	 * Serial version UID for serialization interoperability.
-	 */
+
 	private static final long serialVersionUID = -322650826288034623L;
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/DataStatistics.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/DataStatistics.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/DataStatistics.java
index 1d43b76..cf6f4ec 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/DataStatistics.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/DataStatistics.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.optimizer;
 
 import java.util.HashMap;
@@ -60,11 +59,11 @@ public class DataStatistics {
 	 * Caches the given statistics. They are later retrievable under the given identifier.
 	 * 
 	 * @param statistics The statistics to cache.
-	 * @param identifyer The identifier which may be later used to retrieve the statistics.
+	 * @param identifier The identifier which may be later used to retrieve the statistics.
 	 */
-	public void cacheBaseStatistics(BaseStatistics statistics, String identifyer) {
+	public void cacheBaseStatistics(BaseStatistics statistics, String identifier) {
 		synchronized (this.baseStatisticsCache) {
-			this.baseStatisticsCache.put(identifyer, statistics);
+			this.baseStatisticsCache.put(identifier, statistics);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/Optimizer.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/Optimizer.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/Optimizer.java
new file mode 100644
index 0000000..e76501c
--- /dev/null
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/Optimizer.java
@@ -0,0 +1,1372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.flink.api.common.operators.base.GroupCombineOperatorBase;
+import org.apache.flink.optimizer.dag.GroupCombineNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.flink.api.common.ExecutionMode;
+import org.apache.flink.api.common.operators.base.SortPartitionOperatorBase;
+import org.apache.flink.optimizer.dag.SortPartitionNode;
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.operators.GenericDataSinkBase;
+import org.apache.flink.api.common.operators.GenericDataSourceBase;
+import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.common.operators.Union;
+import org.apache.flink.api.common.operators.base.BulkIterationBase;
+import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
+import org.apache.flink.api.common.operators.base.CrossOperatorBase;
+import org.apache.flink.api.common.operators.base.DeltaIterationBase;
+import org.apache.flink.api.common.operators.base.FilterOperatorBase;
+import org.apache.flink.api.common.operators.base.FlatMapOperatorBase;
+import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase;
+import org.apache.flink.api.common.operators.base.MapOperatorBase;
+import org.apache.flink.api.common.operators.base.MapPartitionOperatorBase;
+import org.apache.flink.api.common.operators.base.PartitionOperatorBase;
+import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
+import org.apache.flink.api.common.operators.base.BulkIterationBase.PartialSolutionPlaceHolder;
+import org.apache.flink.api.common.operators.base.DeltaIterationBase.SolutionSetPlaceHolder;
+import org.apache.flink.api.common.operators.base.DeltaIterationBase.WorksetPlaceHolder;
+import org.apache.flink.optimizer.costs.CostEstimator;
+import org.apache.flink.optimizer.costs.DefaultCostEstimator;
+import org.apache.flink.optimizer.dag.BinaryUnionNode;
+import org.apache.flink.optimizer.dag.BulkIterationNode;
+import org.apache.flink.optimizer.dag.BulkPartialSolutionNode;
+import org.apache.flink.optimizer.dag.CoGroupNode;
+import org.apache.flink.optimizer.dag.CollectorMapNode;
+import org.apache.flink.optimizer.dag.CrossNode;
+import org.apache.flink.optimizer.dag.DataSinkNode;
+import org.apache.flink.optimizer.dag.DataSourceNode;
+import org.apache.flink.optimizer.dag.FilterNode;
+import org.apache.flink.optimizer.dag.FlatMapNode;
+import org.apache.flink.optimizer.dag.GroupReduceNode;
+import org.apache.flink.optimizer.dag.IterationNode;
+import org.apache.flink.optimizer.dag.MapNode;
+import org.apache.flink.optimizer.dag.MapPartitionNode;
+import org.apache.flink.optimizer.dag.JoinNode;
+import org.apache.flink.optimizer.dag.OptimizerNode;
+import org.apache.flink.optimizer.dag.DagConnection;
+import org.apache.flink.optimizer.dag.PartitionNode;
+import org.apache.flink.optimizer.dag.ReduceNode;
+import org.apache.flink.optimizer.dag.SinkJoiner;
+import org.apache.flink.optimizer.dag.SolutionSetNode;
+import org.apache.flink.optimizer.dag.TempMode;
+import org.apache.flink.optimizer.dag.WorksetIterationNode;
+import org.apache.flink.optimizer.dag.WorksetNode;
+import org.apache.flink.optimizer.deadlockdetect.DeadlockPreventer;
+import org.apache.flink.optimizer.plan.BinaryUnionPlanNode;
+import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
+import org.apache.flink.optimizer.plan.BulkPartialSolutionPlanNode;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.IterationPlanNode;
+import org.apache.flink.optimizer.plan.NAryUnionPlanNode;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.PlanNode;
+import org.apache.flink.optimizer.plan.SinkJoinerPlanNode;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.optimizer.plan.SolutionSetPlanNode;
+import org.apache.flink.optimizer.plan.SourcePlanNode;
+import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
+import org.apache.flink.optimizer.plan.WorksetPlanNode;
+import org.apache.flink.optimizer.postpass.OptimizerPostPass;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.runtime.operators.util.LocalStrategy;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Visitor;
+
+/**
+ * The optimizer that takes the user specified program plan and creates an optimized plan that contains
+ * exact descriptions about how the physical execution will take place. It first translates the user
+ * program into an internal optimizer representation and then chooses between different alternatives
+ * for shipping strategies and local strategies.
+ * <p>
+ * The basic principle is taken from optimizer works in systems such as Volcano/Cascades and Selinger/System-R/DB2. The
+ * optimizer walks from the sinks down, generating interesting properties, and ascends from the sources generating
+ * alternative plans, pruning against the interesting properties.
+ * <p>
+ * The optimizer also assigns the memory to the individual tasks. This is currently done in a very simple fashion: All
+ * sub-tasks that need memory (e.g. reduce or join) are given an equal share of memory.
+ */
+public class Optimizer {
+
+	// ------------------------------------------------------------------------
+	// Constants
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Compiler hint key for the input channel's shipping strategy. This String is a key to the operator's stub
+	 * parameters. The corresponding value tells the compiler which shipping strategy to use for the input channel.
+	 * If the operator has two input channels, the shipping strategy is applied to both input channels.
+	 */
+	public static final String HINT_SHIP_STRATEGY = "INPUT_SHIP_STRATEGY";
+
+	/**
+	 * Compiler hint key for the <b>first</b> input channel's shipping strategy. This String is a key to
+	 * the operator's stub parameters. The corresponding value tells the compiler which shipping strategy
+	 * to use for the <b>first</b> input channel. Only applicable to operators with two inputs.
+	 */
+	public static final String HINT_SHIP_STRATEGY_FIRST_INPUT = "INPUT_LEFT_SHIP_STRATEGY";
+
+	/**
+	 * Compiler hint key for the <b>second</b> input channel's shipping strategy. This String is a key to
+	 * the operator's stub parameters. The corresponding value tells the compiler which shipping strategy
+	 * to use for the <b>second</b> input channel. Only applicable to operators with two inputs.
+	 */
+	public static final String HINT_SHIP_STRATEGY_SECOND_INPUT = "INPUT_RIGHT_SHIP_STRATEGY";
+
+	/**
+	 * Value for the shipping strategy compiler hint that enforces a <b>Forward</b> strategy on the
+	 * input channel, i.e. no redistribution of any kind.
+	 * 
+	 * @see #HINT_SHIP_STRATEGY
+	 * @see #HINT_SHIP_STRATEGY_FIRST_INPUT
+	 * @see #HINT_SHIP_STRATEGY_SECOND_INPUT
+	 */
+	public static final String HINT_SHIP_STRATEGY_FORWARD = "SHIP_FORWARD";
+	
+	/**
+	 * Value for the shipping strategy compiler hint that enforces a random repartition strategy.
+	 * 
+	 * @see #HINT_SHIP_STRATEGY
+	 * @see #HINT_SHIP_STRATEGY_FIRST_INPUT
+	 * @see #HINT_SHIP_STRATEGY_SECOND_INPUT
+	 */
+	public static final String HINT_SHIP_STRATEGY_REPARTITION= "SHIP_REPARTITION";
+	
+	/**
+	 * Value for the shipping strategy compiler hint that enforces a hash-partition strategy.
+	 * 
+	 * @see #HINT_SHIP_STRATEGY
+	 * @see #HINT_SHIP_STRATEGY_FIRST_INPUT
+	 * @see #HINT_SHIP_STRATEGY_SECOND_INPUT
+	 */
+	public static final String HINT_SHIP_STRATEGY_REPARTITION_HASH = "SHIP_REPARTITION_HASH";
+	
+	/**
+	 * Value for the shipping strategy compiler hint that enforces a range-partition strategy.
+	 * 
+	 * @see #HINT_SHIP_STRATEGY
+	 * @see #HINT_SHIP_STRATEGY_FIRST_INPUT
+	 * @see #HINT_SHIP_STRATEGY_SECOND_INPUT
+	 */
+	public static final String HINT_SHIP_STRATEGY_REPARTITION_RANGE = "SHIP_REPARTITION_RANGE";
+
+	/**
+	 * Value for the shipping strategy compiler hint that enforces a <b>broadcast</b> strategy on the
+	 * input channel.
+	 * 
+	 * @see #HINT_SHIP_STRATEGY
+	 * @see #HINT_SHIP_STRATEGY_FIRST_INPUT
+	 * @see #HINT_SHIP_STRATEGY_SECOND_INPUT
+	 */
+	public static final String HINT_SHIP_STRATEGY_BROADCAST = "SHIP_BROADCAST";
+
+	/**
+	 * Compiler hint key for the operator's local strategy. This String is a key to the operator's stub
+	 * parameters. The corresponding value tells the compiler which local strategy to use to process the
+	 * data inside one partition.
+	 * <p>
+	 * This hint is ignored by operators that do not have a local strategy (such as <i>Map</i>), or by operators that
+	 * have no choice in their local strategy (such as <i>Cross</i>).
+	 */
+	public static final String HINT_LOCAL_STRATEGY = "LOCAL_STRATEGY";
+
+	/**
+	 * Value for the local strategy compiler hint that enforces a <b>sort based</b> local strategy.
+	 * For example, a <i>Reduce</i> operator will sort the data to group it.
+	 * 
+	 * @see #HINT_LOCAL_STRATEGY
+	 */
+	public static final String HINT_LOCAL_STRATEGY_SORT = "LOCAL_STRATEGY_SORT";
+	
+	/**
+	 * Value for the local strategy compiler hint that enforces a <b>sort based</b> local strategy.
+	 * During sorting a combine method is repeatedly applied to reduce the data volume.
+	 * For example, a <i>Reduce</i> operator will sort the data to group it.
+	 * 
+	 * @see #HINT_LOCAL_STRATEGY
+	 */
+	public static final String HINT_LOCAL_STRATEGY_COMBINING_SORT = "LOCAL_STRATEGY_COMBINING_SORT";
+	
+	/**
+	 * Value for the local strategy compiler hint that enforces a <b>sort merge based</b> local strategy on both
+	 * inputs with subsequent merging of inputs. 
+	 * For example, a <i>Match</i> or <i>CoGroup</i> operator will use a sort-merge strategy to find pairs 
+	 * of matching keys.
+	 * 
+	 * @see #HINT_LOCAL_STRATEGY
+	 */
+	public static final String HINT_LOCAL_STRATEGY_SORT_BOTH_MERGE = "LOCAL_STRATEGY_SORT_BOTH_MERGE";
+	
+	/**
+	 * Value for the local strategy compiler hint that enforces a <b>sort merge based</b> local strategy.
+	 * The first input is sorted, the second input is assumed to be sorted. After sorting both inputs are merged.
+	 * For example, a <i>Match</i> or <i>CoGroup</i> operator will use a sort-merge strategy to find pairs 
+	 * of matching keys.
+	 * 
+	 * @see #HINT_LOCAL_STRATEGY
+	 */
+	public static final String HINT_LOCAL_STRATEGY_SORT_FIRST_MERGE = "LOCAL_STRATEGY_SORT_FIRST_MERGE";
+	
+	/**
+	 * Value for the local strategy compiler hint that enforces a <b>sort merge based</b> local strategy.
+	 * The second input is sorted, the first input is assumed to be sorted. After sorting both inputs are merged.
+	 * For example, a <i>Match</i> or <i>CoGroup</i> operator will use a sort-merge strategy to find pairs 
+	 * of matching keys.
+	 * 
+	 * @see #HINT_LOCAL_STRATEGY
+	 */
+	public static final String HINT_LOCAL_STRATEGY_SORT_SECOND_MERGE = "LOCAL_STRATEGY_SORT_SECOND_MERGE";
+	
+	/**
+	 * Value for the local strategy compiler hint that enforces a <b>merge based</b> local strategy.
+	 * Both inputs are assumed to be sorted and are merged. 
+	 * For example, a <i>Match</i> or <i>CoGroup</i> operator will use a merge strategy to find pairs 
+	 * of matching keys.
+	 * 
+	 * @see #HINT_LOCAL_STRATEGY
+	 */
+	public static final String HINT_LOCAL_STRATEGY_MERGE = "LOCAL_STRATEGY_MERGE";
+
+	
+	/**
+	 * Value for the local strategy compiler hint that enforces a <b>hash based</b> local strategy.
+	 * For example, a <i>Match</i> operator will use a hybrid-hash-join strategy to find pairs of
+	 * matching keys. The <b>first</b> input will be used to build the hash table, the second input will be
+	 * used to probe the table.
+	 * 
+	 * @see #HINT_LOCAL_STRATEGY
+	 */
+	public static final String HINT_LOCAL_STRATEGY_HASH_BUILD_FIRST = "LOCAL_STRATEGY_HASH_BUILD_FIRST";
+
+	/**
+	 * Value for the local strategy compiler hint that enforces a <b>hash based</b> local strategy.
+	 * For example, a <i>Match</i> operator will use a hybrid-hash-join strategy to find pairs of
+	 * matching keys. The <b>second</b> input will be used to build the hash table, the first input will be
+	 * used to probe the table.
+	 * 
+	 * @see #HINT_LOCAL_STRATEGY
+	 */
+	public static final String HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND = "LOCAL_STRATEGY_HASH_BUILD_SECOND";
+
+	/**
+	 * Value for the local strategy compiler hint that chooses the outer side of the <b>nested-loop</b> local strategy.
+	 * A <i>Cross</i> operator will process the data of the <b>first</b> input in the outer-loop of the nested loops.
+	 * Hence, the data of the first input will be is streamed though, while the data of the second input is stored on
+	 * disk
+	 * and repeatedly read.
+	 * 
+	 * @see #HINT_LOCAL_STRATEGY
+	 */
+	public static final String HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_FIRST = "LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_FIRST";
+
+	/**
+	 * Value for the local strategy compiler hint that chooses the outer side of the <b>nested-loop</b> local strategy.
+	 * A <i>Cross</i> operator will process the data of the <b>second</b> input in the outer-loop of the nested loops.
+	 * Hence, the data of the second input will be is streamed though, while the data of the first input is stored on
+	 * disk
+	 * and repeatedly read.
+	 * 
+	 * @see #HINT_LOCAL_STRATEGY
+	 */
+	public static final String HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_SECOND = "LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_SECOND";
+
+	/**
+	 * Value for the local strategy compiler hint that chooses the outer side of the <b>nested-loop</b> local strategy.
+	 * A <i>Cross</i> operator will process the data of the <b>first</b> input in the outer-loop of the nested loops.
+	 * Further more, the first input, being the outer side, will be processed in blocks, and for each block, the second
+	 * input,
+	 * being the inner side, will read repeatedly from disk.
+	 * 
+	 * @see #HINT_LOCAL_STRATEGY
+	 */
+	public static final String HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_FIRST = "LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_FIRST";
+
+	/**
+	 * Value for the local strategy compiler hint that chooses the outer side of the <b>nested-loop</b> local strategy.
+	 * A <i>Cross</i> operator will process the data of the <b>second</b> input in the outer-loop of the nested loops.
+	 * Further more, the second input, being the outer side, will be processed in blocks, and for each block, the first
+	 * input,
+	 * being the inner side, will read repeatedly from disk.
+	 * 
+	 * @see #HINT_LOCAL_STRATEGY
+	 */
+	public static final String HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_SECOND = "LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_SECOND";
+	
+	/**
+	 * The log handle that is used by the compiler to log messages.
+	 */
+	public static final Logger LOG = LoggerFactory.getLogger(Optimizer.class);
+
+	// ------------------------------------------------------------------------
+	// Members
+	// ------------------------------------------------------------------------
+
+	/**
+	 * The statistics object used to obtain statistics, such as input sizes,
+	 * for the cost estimation process.
+	 */
+	private final DataStatistics statistics;
+
+	/**
+	 * The cost estimator used by the compiler.
+	 */
+	private final CostEstimator costEstimator;
+
+	/**
+	 * The default degree of parallelism for jobs compiled by this compiler.
+	 */
+	private int defaultDegreeOfParallelism;
+
+
+	// ------------------------------------------------------------------------
+	// Constructor & Setup
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates a new optimizer instance. The optimizer has no access to statistics about the
+	 * inputs and can hence not determine any properties. It will perform all optimization with
+	 * unknown sizes and hence use only the heuristic cost functions, which result in the selection
+	 * of the most robust execution strategies.
+	 */
+	public Optimizer() {
+		this(null, new DefaultCostEstimator());
+	}
+
+	/**
+	 * Creates a new optimizer instance that uses the statistics object to determine properties about the input.
+	 * Given those statistics, the optimizer can make better choices for the execution strategies.
+	 * 
+	 * @param stats
+	 *        The statistics to be used to determine the input properties.
+	 */
+	public Optimizer(DataStatistics stats) {
+		this(stats, new DefaultCostEstimator());
+	}
+
+	/**
+	 * Creates a new optimizer instance. The optimizer has no access to statistics about the
+	 * inputs and can hence not determine any properties. It will perform all optimization with
+	 * unknown sizes and hence use only the heuristic cost functions, which result in the selection
+	 * of the most robust execution strategies.
+	 *
+	 * The optimizer uses the given cost estimator to compute the costs of the individual operations.
+	 * 
+	 * @param estimator The cost estimator to use to cost the individual operations.
+	 */
+	public Optimizer(CostEstimator estimator) {
+		this(null, estimator);
+	}
+
+	/**
+	 * Creates a new optimizer instance that uses the statistics object to determine properties about the input.
+	 * Given those statistics, the optimizer can make better choices for the execution strategies.
+	 *
+	 * The optimizer uses the given cost estimator to compute the costs of the individual operations.
+	 * 
+	 * @param stats
+	 *        The statistics to be used to determine the input properties.
+	 * @param estimator
+	 *        The <tt>CostEstimator</tt> to use to cost the individual operations.
+	 */
+	public Optimizer(DataStatistics stats, CostEstimator estimator) {
+		this.statistics = stats;
+		this.costEstimator = estimator;
+
+		// determine the default parallelism
+		this.defaultDegreeOfParallelism = GlobalConfiguration.getInteger(
+				ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE_KEY,
+				ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE);
+		
+		if (defaultDegreeOfParallelism < 1) {
+			LOG.warn("Config value " + defaultDegreeOfParallelism + " for option "
+					+ ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE + " is invalid. Ignoring and using a value of 1.");
+			this.defaultDegreeOfParallelism = 1;
+		}
+	}
+	
+	// ------------------------------------------------------------------------
+	//                             Getters / Setters
+	// ------------------------------------------------------------------------
+	
+	public int getDefaultDegreeOfParallelism() {
+		return defaultDegreeOfParallelism;
+	}
+	
+	public void setDefaultDegreeOfParallelism(int defaultDegreeOfParallelism) {
+		if (defaultDegreeOfParallelism > 0) {
+			this.defaultDegreeOfParallelism = defaultDegreeOfParallelism;
+		} else {
+			throw new IllegalArgumentException("Default parallelism cannot be zero or negative.");
+		}
+	}
+	
+	// ------------------------------------------------------------------------
+	//                               Compilation
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Translates the given program to an OptimizedPlan, where all nodes have their local strategy assigned
+	 * and all channels have a shipping strategy assigned.
+	 *
+	 * For more details on the optimization phase, see the comments for
+	 * {@link #compile(org.apache.flink.api.common.Plan, org.apache.flink.optimizer.postpass.OptimizerPostPass)}.
+	 * 
+	 * @param program The program to be translated.
+	 * @return The optimized plan.
+	 *
+	 * @throws CompilerException
+	 *         Thrown, if the plan is invalid or the optimizer encountered an inconsistent
+	 *         situation during the compilation process.
+	 */
+	public OptimizedPlan compile(Plan program) throws CompilerException {
+		final OptimizerPostPass postPasser = getPostPassFromPlan(program);
+		return compile(program, postPasser);
+	}
+
+	/**
+	 * Translates the given program to an OptimizedPlan. The optimized plan describes for each operator
+	 * which strategy to use (such as hash join versus sort-merge join), what data exchange method to use
+	 * (local pipe forward, shuffle, broadcast), what exchange mode to use (pipelined, batch),
+	 * where to cache intermediate results, etc,
+	 *
+	 * The optimization happens in multiple phases:
+	 * <ol>
+	 *     <li>Create optimizer dag implementation of the program.
+	 *
+	 *     <tt>OptimizerNode</tt> representations of the PACTs, assign parallelism and compute size estimates.</li>
+	 * <li>Compute interesting properties and auxiliary structures.</li>
+	 * <li>Enumerate plan alternatives. This cannot be done in the same step as the interesting property computation (as
+	 * opposed to the Database approaches), because we support plans that are not trees.</li>
+	 * </ol>
+	 * 
+	 * @param program The program to be translated.
+	 * @param postPasser The function to be used for post passing the optimizer's plan and setting the
+	 *                   data type specific serialization routines.
+	 * @return The optimized plan.
+	 * 
+	 * @throws CompilerException
+	 *         Thrown, if the plan is invalid or the optimizer encountered an inconsistent
+	 *         situation during the compilation process.
+	 */
+	private OptimizedPlan compile(Plan program, OptimizerPostPass postPasser) throws CompilerException {
+		if (program == null || postPasser == null) {
+			throw new NullPointerException();
+		}
+		
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Beginning compilation of program '" + program.getJobName() + '\'');
+		}
+
+		final ExecutionMode defaultDataExchangeMode = program.getExecutionConfig().getExecutionMode();
+
+		final int defaultParallelism = program.getDefaultParallelism() > 0 ?
+			program.getDefaultParallelism() : this.defaultDegreeOfParallelism;
+
+		// log the default settings
+		LOG.debug("Using a default parallelism of {}",  defaultParallelism);
+		LOG.debug("Using default data exchange mode {}", defaultDataExchangeMode);
+
+		// the first step in the compilation is to create the optimizer plan representation
+		// this step does the following:
+		// 1) It creates an optimizer plan node for each operator
+		// 2) It connects them via channels
+		// 3) It looks for hints about local strategies and channel types and
+		// sets the types and strategies accordingly
+		// 4) It makes estimates about the data volume of the data sources and
+		// propagates those estimates through the plan
+
+		GraphCreatingVisitor graphCreator = new GraphCreatingVisitor(defaultParallelism, defaultDataExchangeMode);
+		program.accept(graphCreator);
+
+		// if we have a plan with multiple data sinks, add logical optimizer nodes that have two data-sinks as children
+		// each until we have only a single root node. This allows to transparently deal with the nodes with
+		// multiple outputs
+		OptimizerNode rootNode;
+		if (graphCreator.sinks.size() == 1) {
+			rootNode = graphCreator.sinks.get(0);
+		} else if (graphCreator.sinks.size() > 1) {
+			Iterator<DataSinkNode> iter = graphCreator.sinks.iterator();
+			rootNode = iter.next();
+
+			while (iter.hasNext()) {
+				rootNode = new SinkJoiner(rootNode, iter.next());
+			}
+		} else {
+			throw new CompilerException("Bug: The optimizer plan representation has no sinks.");
+		}
+
+		// now that we have all nodes created and recorded which ones consume memory, tell the nodes their minimal
+		// guaranteed memory, for further cost estimations. we assume an equal distribution of memory among consumer tasks
+		rootNode.accept(new IdAndEstimatesVisitor(this.statistics));
+
+		// We are dealing with operator DAGs, rather than operator trees.
+		// That requires us to deviate at some points from the classical DB optimizer algorithms.
+		// This step build some auxiliary structures to help track branches and joins in the DAG
+		BranchesVisitor branchingVisitor = new BranchesVisitor();
+		rootNode.accept(branchingVisitor);
+
+		// Propagate the interesting properties top-down through the graph
+		InterestingPropertyVisitor propsVisitor = new InterestingPropertyVisitor(this.costEstimator);
+		rootNode.accept(propsVisitor);
+		
+		// perform a sanity check: the root may not have any unclosed branches
+		if (rootNode.getOpenBranches() != null && rootNode.getOpenBranches().size() > 0) {
+			throw new CompilerException("Bug: Logic for branching plans (non-tree plans) has an error, and does not " +
+					"track the re-joining of branches correctly.");
+		}
+
+		// the final step is now to generate the actual plan alternatives
+		List<PlanNode> bestPlan = rootNode.getAlternativePlans(this.costEstimator);
+
+		if (bestPlan.size() != 1) {
+			throw new CompilerException("Error in compiler: more than one best plan was created!");
+		}
+
+		// check if the best plan's root is a data sink (single sink plan)
+		// if so, directly take it. if it is a sink joiner node, get its contained sinks
+		PlanNode bestPlanRoot = bestPlan.get(0);
+		List<SinkPlanNode> bestPlanSinks = new ArrayList<SinkPlanNode>(4);
+
+		if (bestPlanRoot instanceof SinkPlanNode) {
+			bestPlanSinks.add((SinkPlanNode) bestPlanRoot);
+		} else if (bestPlanRoot instanceof SinkJoinerPlanNode) {
+			((SinkJoinerPlanNode) bestPlanRoot).getDataSinks(bestPlanSinks);
+		}
+		
+		DeadlockPreventer dp = new DeadlockPreventer();
+		dp.resolveDeadlocks(bestPlanSinks);
+
+		// finalize the plan
+		OptimizedPlan plan = new PlanFinalizer().createFinalPlan(bestPlanSinks, program.getJobName(), program);
+		
+		plan.accept(new BinaryUnionReplacer());
+		
+		// post pass the plan. this is the phase where the serialization and comparator code is set
+		postPasser.postPass(plan);
+		
+		return plan;
+	}
+
+	/**
+	 * This function performs only the first step to the compilation process - the creation of the optimizer
+	 * representation of the plan. No estimations or enumerations of alternatives are done here.
+	 * 
+	 * @param program The plan to generate the optimizer representation for.
+	 * @return The optimizer representation of the plan, as a collection of all data sinks
+	 *         from the plan can be traversed.
+	 */
+	public static List<DataSinkNode> createPreOptimizedPlan(Plan program) {
+		GraphCreatingVisitor graphCreator = new GraphCreatingVisitor(1, null);
+		program.accept(graphCreator);
+		return graphCreator.sinks;
+	}
+	
+	// ------------------------------------------------------------------------
+	//                 Visitors for Compilation Traversals
+	// ------------------------------------------------------------------------
+	
+	/**
+	 * This utility class performs the translation from the user specified program to the optimizer plan.
+	 * It works as a visitor that walks the user's job in a depth-first fashion. During the descend, it creates
+	 * an optimizer node for each operator, respectively data source or -sink. During the ascend, it connects
+	 * the nodes to the full graph.
+	 * <p>
+	 * This translator relies on the <code>setInputs</code> method in the nodes. As that method implements the size
+	 * estimation and the awareness for optimizer hints, the sizes will be properly estimated and the translated plan
+	 * already respects all optimizer hints.
+	 */
+	public static final class GraphCreatingVisitor implements Visitor<Operator<?>> {
+		
+		private final Map<Operator<?>, OptimizerNode> con2node; // map from the operator objects to their
+																// corresponding optimizer nodes
+
+		private final List<DataSinkNode> sinks; // all data sink nodes in the optimizer plan
+
+		private final int defaultParallelism; // the default degree of parallelism
+		
+		private final GraphCreatingVisitor parent;	// reference to enclosing creator, in case of a recursive translation
+
+		private final ExecutionMode defaultDataExchangeMode;
+
+		private final boolean forceDOP;
+
+		
+		public GraphCreatingVisitor(int defaultParallelism, ExecutionMode defaultDataExchangeMode) {
+			this(null, false, defaultParallelism, defaultDataExchangeMode, null);
+		}
+
+		private GraphCreatingVisitor(GraphCreatingVisitor parent, boolean forceDOP, int defaultParallelism,
+									ExecutionMode dataExchangeMode, HashMap<Operator<?>, OptimizerNode> closure) {
+			if (closure == null){
+				con2node = new HashMap<Operator<?>, OptimizerNode>();
+			} else {
+				con2node = closure;
+			}
+
+			this.sinks = new ArrayList<DataSinkNode>(2);
+			this.defaultParallelism = defaultParallelism;
+			this.parent = parent;
+			this.defaultDataExchangeMode = dataExchangeMode;
+			this.forceDOP = forceDOP;
+		}
+
+		public List<DataSinkNode> getSinks() {
+			return sinks;
+		}
+
+		@SuppressWarnings("deprecation")
+		@Override
+		public boolean preVisit(Operator<?> c) {
+			// check if we have been here before
+			if (this.con2node.containsKey(c)) {
+				return false;
+			}
+
+			final OptimizerNode n;
+
+			// create a node for the operator (or sink or source) if we have not been here before
+			if (c instanceof GenericDataSinkBase) {
+				DataSinkNode dsn = new DataSinkNode((GenericDataSinkBase<?>) c);
+				this.sinks.add(dsn);
+				n = dsn;
+			}
+			else if (c instanceof GenericDataSourceBase) {
+				n = new DataSourceNode((GenericDataSourceBase<?, ?>) c);
+			}
+			else if (c instanceof MapOperatorBase) {
+				n = new MapNode((MapOperatorBase<?, ?, ?>) c);
+			}
+			else if (c instanceof MapPartitionOperatorBase) {
+				n = new MapPartitionNode((MapPartitionOperatorBase<?, ?, ?>) c);
+			}
+			else if (c instanceof org.apache.flink.api.common.operators.base.CollectorMapOperatorBase) {
+				n = new CollectorMapNode((org.apache.flink.api.common.operators.base.CollectorMapOperatorBase<?, ?, ?>) c);
+			}
+			else if (c instanceof FlatMapOperatorBase) {
+				n = new FlatMapNode((FlatMapOperatorBase<?, ?, ?>) c);
+			}
+			else if (c instanceof FilterOperatorBase) {
+				n = new FilterNode((FilterOperatorBase<?, ?>) c);
+			}
+			else if (c instanceof ReduceOperatorBase) {
+				n = new ReduceNode((ReduceOperatorBase<?, ?>) c);
+			}
+			else if (c instanceof GroupReduceOperatorBase) {
+				n = new GroupReduceNode((GroupReduceOperatorBase<?, ?, ?>) c);
+			}
+			else if (c instanceof GroupCombineOperatorBase) {
+				n = new GroupCombineNode((GroupCombineOperatorBase<?, ?, ?>) c);
+			}
+			else if (c instanceof JoinOperatorBase) {
+				n = new JoinNode((JoinOperatorBase<?, ?, ?, ?>) c);
+			}
+			else if (c instanceof CoGroupOperatorBase) {
+				n = new CoGroupNode((CoGroupOperatorBase<?, ?, ?, ?>) c);
+			}
+			else if (c instanceof CrossOperatorBase) {
+				n = new CrossNode((CrossOperatorBase<?, ?, ?, ?>) c);
+			}
+			else if (c instanceof BulkIterationBase) {
+				n = new BulkIterationNode((BulkIterationBase<?>) c);
+			}
+			else if (c instanceof DeltaIterationBase) {
+				n = new WorksetIterationNode((DeltaIterationBase<?, ?>) c);
+			}
+			else if (c instanceof Union){
+				n = new BinaryUnionNode((Union<?>) c);
+			}
+			else if (c instanceof PartitionOperatorBase) {
+				n = new PartitionNode((PartitionOperatorBase<?>) c);
+			}
+			else if (c instanceof SortPartitionOperatorBase) {
+				n = new SortPartitionNode((SortPartitionOperatorBase<?>) c);
+			}
+			else if (c instanceof PartialSolutionPlaceHolder) {
+				if (this.parent == null) {
+					throw new InvalidProgramException("It is currently not supported to create data sinks inside iterations.");
+				}
+				
+				final PartialSolutionPlaceHolder<?> holder = (PartialSolutionPlaceHolder<?>) c;
+				final BulkIterationBase<?> enclosingIteration = holder.getContainingBulkIteration();
+				final BulkIterationNode containingIterationNode =
+							(BulkIterationNode) this.parent.con2node.get(enclosingIteration);
+				
+				// catch this for the recursive translation of step functions
+				BulkPartialSolutionNode p = new BulkPartialSolutionNode(holder, containingIterationNode);
+				p.setDegreeOfParallelism(containingIterationNode.getParallelism());
+				n = p;
+			}
+			else if (c instanceof WorksetPlaceHolder) {
+				if (this.parent == null) {
+					throw new InvalidProgramException("It is currently not supported to create data sinks inside iterations.");
+				}
+				
+				final WorksetPlaceHolder<?> holder = (WorksetPlaceHolder<?>) c;
+				final DeltaIterationBase<?, ?> enclosingIteration = holder.getContainingWorksetIteration();
+				final WorksetIterationNode containingIterationNode =
+							(WorksetIterationNode) this.parent.con2node.get(enclosingIteration);
+				
+				// catch this for the recursive translation of step functions
+				WorksetNode p = new WorksetNode(holder, containingIterationNode);
+				p.setDegreeOfParallelism(containingIterationNode.getParallelism());
+				n = p;
+			}
+			else if (c instanceof SolutionSetPlaceHolder) {
+				if (this.parent == null) {
+					throw new InvalidProgramException("It is currently not supported to create data sinks inside iterations.");
+				}
+				
+				final SolutionSetPlaceHolder<?> holder = (SolutionSetPlaceHolder<?>) c;
+				final DeltaIterationBase<?, ?> enclosingIteration = holder.getContainingWorksetIteration();
+				final WorksetIterationNode containingIterationNode =
+							(WorksetIterationNode) this.parent.con2node.get(enclosingIteration);
+				
+				// catch this for the recursive translation of step functions
+				SolutionSetNode p = new SolutionSetNode(holder, containingIterationNode);
+				p.setDegreeOfParallelism(containingIterationNode.getParallelism());
+				n = p;
+			}
+			else {
+				throw new IllegalArgumentException("Unknown operator type: " + c);
+			}
+
+			this.con2node.put(c, n);
+			
+			// set the parallelism only if it has not been set before. some nodes have a fixed DOP, such as the
+			// key-less reducer (all-reduce)
+			if (n.getParallelism() < 1) {
+				// set the degree of parallelism
+				int par = c.getDegreeOfParallelism();
+				if (par > 0) {
+					if (this.forceDOP && par != this.defaultParallelism) {
+						par = this.defaultParallelism;
+						LOG.warn("The parallelism of nested dataflows (such as step functions in iterations) is " +
+							"currently fixed to the parallelism of the surrounding operator (the iteration).");
+					}
+				} else {
+					par = this.defaultParallelism;
+				}
+				n.setDegreeOfParallelism(par);
+			}
+
+			return true;
+		}
+
+		@Override
+		public void postVisit(Operator<?> c) {
+			
+			OptimizerNode n = this.con2node.get(c);
+
+			// first connect to the predecessors
+			n.setInput(this.con2node, this.defaultDataExchangeMode);
+			n.setBroadcastInputs(this.con2node, this.defaultDataExchangeMode);
+			
+			// if the node represents a bulk iteration, we recursively translate the data flow now
+			if (n instanceof BulkIterationNode) {
+				final BulkIterationNode iterNode = (BulkIterationNode) n;
+				final BulkIterationBase<?> iter = iterNode.getIterationContract();
+
+				// pass a copy of the no iterative part into the iteration translation,
+				// in case the iteration references its closure
+				HashMap<Operator<?>, OptimizerNode> closure = new HashMap<Operator<?>, OptimizerNode>(con2node);
+
+				// first, recursively build the data flow for the step function
+				final GraphCreatingVisitor recursiveCreator = new GraphCreatingVisitor(this, true,
+					iterNode.getParallelism(), defaultDataExchangeMode, closure);
+				
+				BulkPartialSolutionNode partialSolution;
+				
+				iter.getNextPartialSolution().accept(recursiveCreator);
+				
+				partialSolution =  (BulkPartialSolutionNode) recursiveCreator.con2node.get(iter.getPartialSolution());
+				OptimizerNode rootOfStepFunction = recursiveCreator.con2node.get(iter.getNextPartialSolution());
+				if (partialSolution == null) {
+					throw new CompilerException("Error: The step functions result does not depend on the partial solution.");
+				}
+				
+				
+				OptimizerNode terminationCriterion = null;
+				
+				if (iter.getTerminationCriterion() != null) {
+					terminationCriterion = recursiveCreator.con2node.get(iter.getTerminationCriterion());
+					
+					// no intermediate node yet, traverse from the termination criterion to build the missing parts
+					if (terminationCriterion == null) {
+						iter.getTerminationCriterion().accept(recursiveCreator);
+						terminationCriterion = recursiveCreator.con2node.get(iter.getTerminationCriterion());
+					}
+				}
+				
+				iterNode.setPartialSolution(partialSolution);
+				iterNode.setNextPartialSolution(rootOfStepFunction, terminationCriterion);
+				
+				// go over the contained data flow and mark the dynamic path nodes
+				StaticDynamicPathIdentifier identifier = new StaticDynamicPathIdentifier(iterNode.getCostWeight());
+				iterNode.acceptForStepFunction(identifier);
+			}
+			else if (n instanceof WorksetIterationNode) {
+				final WorksetIterationNode iterNode = (WorksetIterationNode) n;
+				final DeltaIterationBase<?, ?> iter = iterNode.getIterationContract();
+
+				// we need to ensure that both the next-workset and the solution-set-delta depend on the workset.
+				// One check is for free during the translation, we do the other check here as a pre-condition
+				{
+					StepFunctionValidator wsf = new StepFunctionValidator();
+					iter.getNextWorkset().accept(wsf);
+					if (!wsf.foundWorkset) {
+						throw new CompilerException("In the given program, the next workset does not depend on the workset. " +
+															"This is a prerequisite in delta iterations.");
+					}
+				}
+				
+				// calculate the closure of the anonymous function
+				HashMap<Operator<?>, OptimizerNode> closure = new HashMap<Operator<?>, OptimizerNode>(con2node);
+
+				// first, recursively build the data flow for the step function
+				final GraphCreatingVisitor recursiveCreator = new GraphCreatingVisitor(
+						this, true, iterNode.getParallelism(), defaultDataExchangeMode, closure);
+				
+				// descend from the solution set delta. check that it depends on both the workset
+				// and the solution set. If it does depend on both, this descend should create both nodes
+				iter.getSolutionSetDelta().accept(recursiveCreator);
+				
+				final WorksetNode worksetNode = (WorksetNode) recursiveCreator.con2node.get(iter.getWorkset());
+				
+				if (worksetNode == null) {
+					throw new CompilerException("In the given program, the solution set delta does not depend on the workset." +
+														"This is a prerequisite in delta iterations.");
+				}
+				
+				iter.getNextWorkset().accept(recursiveCreator);
+				
+				SolutionSetNode solutionSetNode = (SolutionSetNode) recursiveCreator.con2node.get(iter.getSolutionSet());
+				
+				if (solutionSetNode == null || solutionSetNode.getOutgoingConnections() == null || solutionSetNode.getOutgoingConnections().isEmpty()) {
+					solutionSetNode = new SolutionSetNode((SolutionSetPlaceHolder<?>) iter.getSolutionSet(), iterNode);
+				}
+				else {
+					for (DagConnection conn : solutionSetNode.getOutgoingConnections()) {
+						OptimizerNode successor = conn.getTarget();
+					
+						if (successor.getClass() == JoinNode.class) {
+							// find out which input to the match the solution set is
+							JoinNode mn = (JoinNode) successor;
+							if (mn.getFirstPredecessorNode() == solutionSetNode) {
+								mn.makeJoinWithSolutionSet(0);
+							} else if (mn.getSecondPredecessorNode() == solutionSetNode) {
+								mn.makeJoinWithSolutionSet(1);
+							} else {
+								throw new CompilerException();
+							}
+						}
+						else if (successor.getClass() == CoGroupNode.class) {
+							CoGroupNode cg = (CoGroupNode) successor;
+							if (cg.getFirstPredecessorNode() == solutionSetNode) {
+								cg.makeCoGroupWithSolutionSet(0);
+							} else if (cg.getSecondPredecessorNode() == solutionSetNode) {
+								cg.makeCoGroupWithSolutionSet(1);
+							} else {
+								throw new CompilerException();
+							}
+						}
+						else {
+							throw new InvalidProgramException(
+									"Error: The only operations allowed on the solution set are Join and CoGroup.");
+						}
+					}
+				}
+				
+				final OptimizerNode nextWorksetNode = recursiveCreator.con2node.get(iter.getNextWorkset());
+				final OptimizerNode solutionSetDeltaNode = recursiveCreator.con2node.get(iter.getSolutionSetDelta());
+				
+				// set the step function nodes to the iteration node
+				iterNode.setPartialSolution(solutionSetNode, worksetNode);
+				iterNode.setNextPartialSolution(solutionSetDeltaNode, nextWorksetNode, defaultDataExchangeMode);
+				
+				// go over the contained data flow and mark the dynamic path nodes
+				StaticDynamicPathIdentifier pathIdentifier = new StaticDynamicPathIdentifier(iterNode.getCostWeight());
+				iterNode.acceptForStepFunction(pathIdentifier);
+			}
+		}
+	}
+	
+	private static final class StaticDynamicPathIdentifier implements Visitor<OptimizerNode> {
+		
+		private final Set<OptimizerNode> seenBefore = new HashSet<OptimizerNode>();
+		
+		private final int costWeight;
+		
+		private StaticDynamicPathIdentifier(int costWeight) {
+			this.costWeight = costWeight;
+		}
+		
+		@Override
+		public boolean preVisit(OptimizerNode visitable) {
+			return this.seenBefore.add(visitable);
+		}
+
+		@Override
+		public void postVisit(OptimizerNode visitable) {
+			visitable.identifyDynamicPath(this.costWeight);
+			
+			// check that there is no nested iteration on the dynamic path
+			if (visitable.isOnDynamicPath() && visitable instanceof IterationNode) {
+				throw new CompilerException("Nested iterations are currently not supported.");
+			}
+		}
+	}
+	
+	/**
+	 * Simple visitor that sets the minimal guaranteed memory per task based on the amount of available memory,
+	 * the number of memory consumers, and on the task's degree of parallelism.
+	 */
+	public static final class IdAndEstimatesVisitor implements Visitor<OptimizerNode> {
+		
+		private final DataStatistics statistics;
+
+		private int id = 1;
+		
+		public IdAndEstimatesVisitor(DataStatistics statistics) {
+			this.statistics = statistics;
+		}
+
+		@Override
+		public boolean preVisit(OptimizerNode visitable) {
+			return visitable.getId() == -1;
+		}
+
+		@Override
+		public void postVisit(OptimizerNode visitable) {
+			// the node ids
+			visitable.initId(this.id++);
+			
+			// connections need to figure out their maximum path depths
+			for (DagConnection conn : visitable.getIncomingConnections()) {
+				conn.initMaxDepth();
+			}
+			for (DagConnection conn : visitable.getBroadcastConnections()) {
+				conn.initMaxDepth();
+			}
+			
+			// the estimates
+			visitable.computeOutputEstimates(this.statistics);
+			
+			// if required, recurse into the step function
+			if (visitable instanceof IterationNode) {
+				((IterationNode) visitable).acceptForStepFunction(this);
+			}
+		}
+	}
+	
+	/**
+	 * Visitor that computes the interesting properties for each node in the plan. On its recursive
+	 * depth-first descend, it propagates all interesting properties top-down.
+	 */
+	public static final class InterestingPropertyVisitor implements Visitor<OptimizerNode> {
+		
+		private CostEstimator estimator; // the cost estimator for maximal costs of an interesting property
+
+		/**
+		 * Creates a new visitor that computes the interesting properties for all nodes in the plan.
+		 * It uses the given cost estimator used to compute the maximal costs for an interesting property.
+		 * 
+		 * @param estimator
+		 *        The cost estimator to estimate the maximal costs for interesting properties.
+		 */
+		public InterestingPropertyVisitor(CostEstimator estimator) {
+			this.estimator = estimator;
+		}
+		
+		@Override
+		public boolean preVisit(OptimizerNode node) {
+			// The interesting properties must be computed on the descend. In case a node has multiple outputs,
+			// that computation must happen during the last descend.
+
+			if (node.getInterestingProperties() == null && node.haveAllOutputConnectionInterestingProperties()) {
+				node.computeUnionOfInterestingPropertiesFromSuccessors();
+				node.computeInterestingPropertiesForInputs(this.estimator);
+				return true;
+			} else {
+				return false;
+			}
+		}
+
+		@Override
+		public void postVisit(OptimizerNode visitable) {}
+	}
+
+	/**
+	 * On its re-ascend (post visit) this visitor, computes auxiliary maps that are needed to support plans
+	 * that are not a minimally connected DAG (Such plans are not trees, but at least one node feeds its
+	 * output into more than one other node).
+	 */
+	public static final class BranchesVisitor implements Visitor<OptimizerNode> {
+		
+		@Override
+		public boolean preVisit(OptimizerNode node) {
+			return node.getOpenBranches() == null;
+		}
+
+		@Override
+		public void postVisit(OptimizerNode node) {
+			if (node instanceof IterationNode) {
+				((IterationNode) node).acceptForStepFunction(this);
+			}
+
+			node.computeUnclosedBranchStack();
+		}
+	}
+	
+	/**
+	 * Finalization of the plan:
+	 *  - The graph of nodes is double-linked (links from child to parent are inserted)
+	 *  - If unions join static and dynamic paths, the cache is marked as a memory consumer
+	 *  - Relative memory fractions are assigned to all nodes.
+	 *  - All nodes are collected into a set.
+	 */
+	private static final class PlanFinalizer implements Visitor<PlanNode> {
+		
+		private final Set<PlanNode> allNodes; // a set of all nodes in the optimizer plan
+
+		private final List<SourcePlanNode> sources; // all data source nodes in the optimizer plan
+
+		private final List<SinkPlanNode> sinks; // all data sink nodes in the optimizer plan
+		
+		private final Deque<IterationPlanNode> stackOfIterationNodes;
+
+		private int memoryConsumerWeights; // a counter of all memory consumers
+
+		/**
+		 * Creates a new plan finalizer.
+		 */
+		private PlanFinalizer() {
+			this.allNodes = new HashSet<PlanNode>();
+			this.sources = new ArrayList<SourcePlanNode>();
+			this.sinks = new ArrayList<SinkPlanNode>();
+			this.stackOfIterationNodes = new ArrayDeque<IterationPlanNode>();
+		}
+
+		private OptimizedPlan createFinalPlan(List<SinkPlanNode> sinks, String jobName, Plan originalPlan) {
+			this.memoryConsumerWeights = 0;
+			
+			// traverse the graph
+			for (SinkPlanNode node : sinks) {
+				node.accept(this);
+			}
+
+			// assign the memory to each node
+			if (this.memoryConsumerWeights > 0) {
+				for (PlanNode node : this.allNodes) {
+					// assign memory to the driver strategy of the node
+					final int consumerWeight = node.getMemoryConsumerWeight();
+					if (consumerWeight > 0) {
+						final double relativeMem = (double)consumerWeight / this.memoryConsumerWeights;
+						node.setRelativeMemoryPerSubtask(relativeMem);
+						if (LOG.isDebugEnabled()) {
+							LOG.debug("Assigned " + relativeMem + " of total memory to each subtask of " +
+								node.getProgramOperator().getName() + ".");
+						}
+					}
+					
+					// assign memory to the local and global strategies of the channels
+					for (Channel c : node.getInputs()) {
+						if (c.getLocalStrategy().dams()) {
+							final double relativeMem = 1.0 / this.memoryConsumerWeights;
+							c.setRelativeMemoryLocalStrategy(relativeMem);
+							if (LOG.isDebugEnabled()) {
+								LOG.debug("Assigned " + relativeMem + " of total memory to each local strategy " +
+										"instance of " + c + ".");
+							}
+						}
+						if (c.getTempMode() != TempMode.NONE) {
+							final double relativeMem = 1.0/ this.memoryConsumerWeights;
+							c.setRelativeTempMemory(relativeMem);
+							if (LOG.isDebugEnabled()) {
+								LOG.debug("Assigned " + relativeMem + " of total memory to each instance of the temp " +
+										"table for " + c + ".");
+							}
+						}
+					}
+				}
+			}
+			return new OptimizedPlan(this.sources, this.sinks, this.allNodes, jobName, originalPlan);
+		}
+
+		@Override
+		public boolean preVisit(PlanNode visitable) {
+			// if we come here again, prevent a further descend
+			if (!this.allNodes.add(visitable)) {
+				return false;
+			}
+			
+			if (visitable instanceof SinkPlanNode) {
+				this.sinks.add((SinkPlanNode) visitable);
+			}
+			else if (visitable instanceof SourcePlanNode) {
+				this.sources.add((SourcePlanNode) visitable);
+			}
+			else if (visitable instanceof BinaryUnionPlanNode) {
+				BinaryUnionPlanNode unionNode = (BinaryUnionPlanNode) visitable;
+				if (unionNode.unionsStaticAndDynamicPath()) {
+					unionNode.setDriverStrategy(DriverStrategy.UNION_WITH_CACHED);
+				}
+			}
+			else if (visitable instanceof BulkPartialSolutionPlanNode) {
+				// tell the partial solution about the iteration node that contains it
+				final BulkPartialSolutionPlanNode pspn = (BulkPartialSolutionPlanNode) visitable;
+				final IterationPlanNode iteration = this.stackOfIterationNodes.peekLast();
+				
+				// sanity check!
+				if (iteration == null || !(iteration instanceof BulkIterationPlanNode)) {
+					throw new CompilerException("Bug: Error finalizing the plan. " +
+							"Cannot associate the node for a partial solutions with its containing iteration.");
+				}
+				pspn.setContainingIterationNode((BulkIterationPlanNode) iteration);
+			}
+			else if (visitable instanceof WorksetPlanNode) {
+				// tell the partial solution about the iteration node that contains it
+				final WorksetPlanNode wspn = (WorksetPlanNode) visitable;
+				final IterationPlanNode iteration = this.stackOfIterationNodes.peekLast();
+				
+				// sanity check!
+				if (iteration == null || !(iteration instanceof WorksetIterationPlanNode)) {
+					throw new CompilerException("Bug: Error finalizing the plan. " +
+							"Cannot associate the node for a partial solutions with its containing iteration.");
+				}
+				wspn.setContainingIterationNode((WorksetIterationPlanNode) iteration);
+			}
+			else if (visitable instanceof SolutionSetPlanNode) {
+				// tell the partial solution about the iteration node that contains it
+				final SolutionSetPlanNode sspn = (SolutionSetPlanNode) visitable;
+				final IterationPlanNode iteration = this.stackOfIterationNodes.peekLast();
+				
+				// sanity check!
+				if (iteration == null || !(iteration instanceof WorksetIterationPlanNode)) {
+					throw new CompilerException("Bug: Error finalizing the plan. " +
+							"Cannot associate the node for a partial solutions with its containing iteration.");
+				}
+				sspn.setContainingIterationNode((WorksetIterationPlanNode) iteration);
+			}
+			
+			// double-connect the connections. previously, only parents knew their children, because
+			// one child candidate could have been referenced by multiple parents.
+			for (Channel conn : visitable.getInputs()) {
+				conn.setTarget(visitable);
+				conn.getSource().addOutgoingChannel(conn);
+			}
+			
+			for (Channel c : visitable.getBroadcastInputs()) {
+				c.setTarget(visitable);
+				c.getSource().addOutgoingChannel(c);
+			}
+
+			// count the memory consumption
+			this.memoryConsumerWeights += visitable.getMemoryConsumerWeight();
+			for (Channel c : visitable.getInputs()) {
+				if (c.getLocalStrategy().dams()) {
+					this.memoryConsumerWeights++;
+				}
+				if (c.getTempMode() != TempMode.NONE) {
+					this.memoryConsumerWeights++;
+				}
+			}
+			for (Channel c : visitable.getBroadcastInputs()) {
+				if (c.getLocalStrategy().dams()) {
+					this.memoryConsumerWeights++;
+				}
+				if (c.getTempMode() != TempMode.NONE) {
+					this.memoryConsumerWeights++;
+				}
+			}
+			
+			// pass the visitor to the iteraton's step function
+			if (visitable instanceof IterationPlanNode) {
+				// push the iteration node onto the stack
+				final IterationPlanNode iterNode = (IterationPlanNode) visitable;
+				this.stackOfIterationNodes.addLast(iterNode);
+				
+				// recurse
+				((IterationPlanNode) visitable).acceptForStepFunction(this);
+				
+				// pop the iteration node from the stack
+				this.stackOfIterationNodes.removeLast();
+			}
+			return true;
+		}
+
+		@Override
+		public void postVisit(PlanNode visitable) {}
+	}
+	
+	/**
+	 * A visitor that traverses the graph and collects cascading binary unions into a single n-ary
+	 * union operator. The exception is, when on of the union inputs is materialized, such as in the
+	 * static-code-path-cache in iterations.
+	 */
+	private static final class BinaryUnionReplacer implements Visitor<PlanNode> {
+		
+		private final Set<PlanNode> seenBefore = new HashSet<PlanNode>();
+
+		@Override
+		public boolean preVisit(PlanNode visitable) {
+			if (this.seenBefore.add(visitable)) {
+				if (visitable instanceof IterationPlanNode) {
+					((IterationPlanNode) visitable).acceptForStepFunction(this);
+				}
+				return true;
+			} else {
+				return false;
+			}
+		}
+
+		@Override
+		public void postVisit(PlanNode visitable) {
+			
+			if (visitable instanceof BinaryUnionPlanNode) {
+				
+				final BinaryUnionPlanNode unionNode = (BinaryUnionPlanNode) visitable;
+				final Channel in1 = unionNode.getInput1();
+				final Channel in2 = unionNode.getInput2();
+			
+				if (!unionNode.unionsStaticAndDynamicPath()) {
+					
+					// both on static path, or both on dynamic path. we can collapse them
+					NAryUnionPlanNode newUnionNode;
+
+					List<Channel> inputs = new ArrayList<Channel>();
+					collect(in1, inputs);
+					collect(in2, inputs);
+
+					newUnionNode = new NAryUnionPlanNode(unionNode.getOptimizerNode(), inputs, 
+							unionNode.getGlobalProperties(), unionNode.getCumulativeCosts());
+					
+					newUnionNode.setParallelism(unionNode.getParallelism());
+
+					for (Channel c : inputs) {
+						c.setTarget(newUnionNode);
+					}
+
+					for (Channel channel : unionNode.getOutgoingChannels()) {
+						channel.swapUnionNodes(newUnionNode);
+						newUnionNode.addOutgoingChannel(channel);
+					}
+				}
+				else {
+					// union between the static and the dynamic path. we need to handle this for now
+					// through a special union operator
+					
+					// make sure that the first input is the cached (static) and the second input is the dynamic
+					if (in1.isOnDynamicPath()) {
+						BinaryUnionPlanNode newUnionNode = new BinaryUnionPlanNode(unionNode);
+						
+						in1.setTarget(newUnionNode);
+						in2.setTarget(newUnionNode);
+						
+						for (Channel channel : unionNode.getOutgoingChannels()) {
+							channel.swapUnionNodes(newUnionNode);
+							newUnionNode.addOutgoingChannel(channel);
+						}
+					}
+				}
+			}
+		}
+		
+		private void collect(Channel in, List<Channel> inputs) {
+			if (in.getSource() instanceof NAryUnionPlanNode) {
+				// sanity check
+				if (in.getShipStrategy() != ShipStrategyType.FORWARD) {
+					throw new CompilerException("Bug: Plan generation for Unions picked a ship strategy between binary plan operators.");
+				}
+				if (!(in.getLocalStrategy() == null || in.getLocalStrategy() == LocalStrategy.NONE)) {
+					throw new CompilerException("Bug: Plan generation for Unions picked a local strategy between binary plan operators.");
+				}
+				
+				inputs.addAll(((NAryUnionPlanNode) in.getSource()).getListOfInputs());
+			} else {
+				// is not a collapsed union node, so we take the channel directly
+				inputs.add(in);
+			}
+		}
+	}
+	
+	private static final class StepFunctionValidator implements Visitor<Operator<?>> {
+
+		private final Set<Operator<?>> seenBefore = new HashSet<Operator<?>>();
+		
+		private boolean foundWorkset;
+		
+		@Override
+		public boolean preVisit(Operator<?> visitable) {
+			if (visitable instanceof WorksetPlaceHolder) {
+				foundWorkset = true;
+			}
+			
+			return (!foundWorkset) && seenBefore.add(visitable);
+		}
+
+		@Override
+		public void postVisit(Operator<?> visitable) {}
+	}
+
+	// ------------------------------------------------------------------------
+	// Miscellaneous
+	// ------------------------------------------------------------------------
+	
+	private OptimizerPostPass getPostPassFromPlan(Plan program) {
+		final String className =  program.getPostPassClassName();
+		if (className == null) {
+			throw new CompilerException("Optimizer Post Pass class description is null");
+		}
+		try {
+			Class<? extends OptimizerPostPass> clazz = Class.forName(className).asSubclass(OptimizerPostPass.class);
+			try {
+				return InstantiationUtil.instantiate(clazz, OptimizerPostPass.class);
+			} catch (RuntimeException rtex) {
+				// unwrap the source exception
+				if (rtex.getCause() != null) {
+					throw new CompilerException("Cannot instantiate optimizer post pass: " + rtex.getMessage(), rtex.getCause());
+				} else {
+					throw rtex;
+				}
+			}
+		} catch (ClassNotFoundException cnfex) {
+			throw new CompilerException("Cannot load Optimizer post-pass class '" + className + "'.", cnfex);
+		} catch (ClassCastException ccex) {
+			throw new CompilerException("Class '" + className + "' is not an optimizer post passer.", ccex);
+		}
+	}
+}