You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/20 11:07:26 UTC

[47/53] [abbrv] flink git commit: [FLINK-441] [optimizer] Rename Pact* and Nephele* classes

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
index 0dd7e93..ff0e004 100644
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
@@ -47,7 +47,7 @@ import org.apache.flink.api.java.record.operators.ReduceOperator;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.optimizer.testfunctions.DummyFlatJoinFunction;
 import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
 import org.apache.flink.optimizer.testfunctions.IdentityKeyExtractor;
@@ -101,7 +101,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
 	
 			// ---------- compile plan to nephele job graph to verify that no error is thrown ----------
 	
-			NepheleJobGraphGenerator jobGen = new NepheleJobGraphGenerator();
+			JobGraphGenerator jobGen = new JobGraphGenerator();
 			jobGen.compileJobGraph(oPlan);
 		} catch (Exception e) {
 			e.printStackTrace();
@@ -162,13 +162,13 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
 			allSinks.add(out3Path);
 			
 			for (SinkPlanNode n : oPlan.getDataSinks()) {
-				String path = ((FileDataSink) n.getSinkNode().getPactContract()).getFilePath();
+				String path = ((FileDataSink) n.getSinkNode().getOperator()).getFilePath();
 				Assert.assertTrue("Invalid data sink.", allSinks.remove(path));
 			}
 			
 			// ---------- compile plan to nephele job graph to verify that no error is thrown ----------
 			
-			NepheleJobGraphGenerator jobGen = new NepheleJobGraphGenerator();
+			JobGraphGenerator jobGen = new JobGraphGenerator();
 			jobGen.compileJobGraph(oPlan);
 		} catch (Exception e) {
 			e.printStackTrace();
@@ -262,7 +262,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
 			
 			OptimizedPlan oPlan = compileNoStats(plan);
 			
-			NepheleJobGraphGenerator jobGen = new NepheleJobGraphGenerator();
+			JobGraphGenerator jobGen = new JobGraphGenerator();
 			
 			//Compile plan to verify that no error is thrown
 			jobGen.compileJobGraph(oPlan);
@@ -350,13 +350,13 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
 			allSinks.add(out3Path);
 			
 			for (SinkPlanNode n : oPlan.getDataSinks()) {
-				String path = ((FileDataSink) n.getSinkNode().getPactContract()).getFilePath();
+				String path = ((FileDataSink) n.getSinkNode().getOperator()).getFilePath();
 				Assert.assertTrue("Invalid data sink.", allSinks.remove(path));
 			}
 			
 			// ---------- compile plan to nephele job graph to verify that no error is thrown ----------
 			
-			NepheleJobGraphGenerator jobGen = new NepheleJobGraphGenerator();
+			JobGraphGenerator jobGen = new JobGraphGenerator();
 			jobGen.compileJobGraph(oPlan);
 		} catch (Exception e) {
 			e.printStackTrace();
@@ -449,7 +449,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
 			
 			OptimizedPlan oPlan = compileNoStats(plan);
 			
-			NepheleJobGraphGenerator jobGen = new NepheleJobGraphGenerator();
+			JobGraphGenerator jobGen = new JobGraphGenerator();
 			
 			//Compile plan to verify that no error is thrown
 			jobGen.compileJobGraph(oPlan);
@@ -495,7 +495,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
 				.input2(ma2)
 				.name("Match 2")
 				.build();
-			mat2.setParameter(PactCompiler.HINT_LOCAL_STRATEGY, PactCompiler.HINT_LOCAL_STRATEGY_MERGE);
+			mat2.setParameter(Optimizer.HINT_LOCAL_STRATEGY, Optimizer.HINT_LOCAL_STRATEGY_MERGE);
 			
 			FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, mat2);
 			
@@ -505,7 +505,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
 			
 			OptimizedPlan oPlan = compileNoStats(plan);
 			
-			NepheleJobGraphGenerator jobGen = new NepheleJobGraphGenerator();
+			JobGraphGenerator jobGen = new JobGraphGenerator();
 			
 			//Compile plan to verify that no error is thrown
 			jobGen.compileJobGraph(oPlan);
@@ -555,13 +555,13 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
 			allSinks.add(out2Path);
 			
 			for (SinkPlanNode n : oPlan.getDataSinks()) {
-				String path = ((FileDataSink) n.getSinkNode().getPactContract()).getFilePath();
+				String path = ((FileDataSink) n.getSinkNode().getOperator()).getFilePath();
 				Assert.assertTrue("Invalid data sink.", allSinks.remove(path));
 			}
 			
 			// ---------- compile plan to nephele job graph to verify that no error is thrown ----------
 			
-			NepheleJobGraphGenerator jobGen = new NepheleJobGraphGenerator();
+			JobGraphGenerator jobGen = new JobGraphGenerator();
 			jobGen.compileJobGraph(oPlan);
 		} catch (Exception e) {
 			e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java
index c17ebe8..3e7da6c 100644
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java
@@ -33,7 +33,7 @@ import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.optimizer.dag.TempMode;
 import org.apache.flink.optimizer.plan.DualInputPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.operators.DriverStrategy;
 
@@ -51,7 +51,7 @@ public class CachedMatchStrategyCompilerTest extends CompilerTestBase {
 	public void testRightSide() {
 		try {
 			
-			Plan plan = getTestPlanRightStatic(PactCompiler.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND);
+			Plan plan = getTestPlanRightStatic(Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND);
 			
 			OptimizedPlan oPlan = compileNoStats(plan);
 	
@@ -63,7 +63,7 @@ public class CachedMatchStrategyCompilerTest extends CompilerTestBase {
 			assertEquals(TempMode.NONE, innerJoin.getInput1().getTempMode());
 			assertEquals(TempMode.NONE, innerJoin.getInput2().getTempMode());
 		
-			new NepheleJobGraphGenerator().compileJobGraph(oPlan);
+			new JobGraphGenerator().compileJobGraph(oPlan);
 		}
 		catch (Exception e) {
 			System.err.println(e.getMessage());
@@ -79,7 +79,7 @@ public class CachedMatchStrategyCompilerTest extends CompilerTestBase {
 	public void testRightSideCountercheck() {
 		try {
 			
-			Plan plan = getTestPlanRightStatic(PactCompiler.HINT_LOCAL_STRATEGY_HASH_BUILD_FIRST);
+			Plan plan = getTestPlanRightStatic(Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_FIRST);
 			
 			OptimizedPlan oPlan = compileNoStats(plan);
 	
@@ -91,7 +91,7 @@ public class CachedMatchStrategyCompilerTest extends CompilerTestBase {
 			assertEquals(TempMode.NONE, innerJoin.getInput1().getTempMode());
 			assertEquals(TempMode.CACHED, innerJoin.getInput2().getTempMode());
 		
-			new NepheleJobGraphGenerator().compileJobGraph(oPlan);
+			new JobGraphGenerator().compileJobGraph(oPlan);
 		}
 		catch (Exception e) {
 			System.err.println(e.getMessage());
@@ -108,7 +108,7 @@ public class CachedMatchStrategyCompilerTest extends CompilerTestBase {
 	public void testLeftSide() {
 		try {
 			
-			Plan plan = getTestPlanLeftStatic(PactCompiler.HINT_LOCAL_STRATEGY_HASH_BUILD_FIRST);
+			Plan plan = getTestPlanLeftStatic(Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_FIRST);
 			
 			OptimizedPlan oPlan = compileNoStats(plan);
 	
@@ -120,7 +120,7 @@ public class CachedMatchStrategyCompilerTest extends CompilerTestBase {
 			assertEquals(TempMode.NONE, innerJoin.getInput1().getTempMode());
 			assertEquals(TempMode.NONE, innerJoin.getInput2().getTempMode());
 		
-			new NepheleJobGraphGenerator().compileJobGraph(oPlan);
+			new JobGraphGenerator().compileJobGraph(oPlan);
 		}
 		catch (Exception e) {
 			System.err.println(e.getMessage());
@@ -136,7 +136,7 @@ public class CachedMatchStrategyCompilerTest extends CompilerTestBase {
 	public void testLeftSideCountercheck() {
 		try {
 			
-			Plan plan = getTestPlanLeftStatic(PactCompiler.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND);
+			Plan plan = getTestPlanLeftStatic(Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND);
 			
 			OptimizedPlan oPlan = compileNoStats(plan);
 	
@@ -148,7 +148,7 @@ public class CachedMatchStrategyCompilerTest extends CompilerTestBase {
 			assertEquals(TempMode.CACHED, innerJoin.getInput1().getTempMode());
 			assertEquals(TempMode.NONE, innerJoin.getInput2().getTempMode());
 		
-			new NepheleJobGraphGenerator().compileJobGraph(oPlan);
+			new JobGraphGenerator().compileJobGraph(oPlan);
 		}
 		catch (Exception e) {
 			System.err.println(e.getMessage());
@@ -191,7 +191,7 @@ public class CachedMatchStrategyCompilerTest extends CompilerTestBase {
 			assertEquals(TempMode.NONE, innerJoin.getInput1().getTempMode());
 			assertEquals(TempMode.NONE, innerJoin.getInput2().getTempMode());
 		
-			new NepheleJobGraphGenerator().compileJobGraph(oPlan);
+			new JobGraphGenerator().compileJobGraph(oPlan);
 		}
 		catch (Exception e) {
 			System.err.println(e.getMessage());
@@ -212,10 +212,10 @@ public class CachedMatchStrategyCompilerTest extends CompilerTestBase {
 		IterativeDataSet<Tuple3<Long, Long, Long>> iteration = bigInput.iterate(10);
 		
 		Configuration joinStrategy = new Configuration();
-		joinStrategy.setString(PactCompiler.HINT_SHIP_STRATEGY, PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_HASH);
+		joinStrategy.setString(Optimizer.HINT_SHIP_STRATEGY, Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH);
 		
 		if(strategy != "") {
-			joinStrategy.setString(PactCompiler.HINT_LOCAL_STRATEGY, strategy);
+			joinStrategy.setString(Optimizer.HINT_LOCAL_STRATEGY, strategy);
 		}
 		
 		DataSet<Tuple3<Long, Long, Long>> inner = iteration.join(smallInput).where(0).equalTo(0).with(new DummyJoiner()).name("DummyJoiner").withParameters(joinStrategy);
@@ -243,7 +243,7 @@ public class CachedMatchStrategyCompilerTest extends CompilerTestBase {
 		IterativeDataSet<Tuple3<Long, Long, Long>> iteration = bigInput.iterate(10);
 		
 		Configuration joinStrategy = new Configuration();
-		joinStrategy.setString(PactCompiler.HINT_LOCAL_STRATEGY, strategy);
+		joinStrategy.setString(Optimizer.HINT_LOCAL_STRATEGY, strategy);
 		
 		DataSet<Tuple3<Long, Long, Long>> inner = smallInput.join(iteration).where(0).equalTo(0).with(new DummyJoiner()).name("DummyJoiner").withParameters(joinStrategy);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java
index 5265e3a..565d992 100644
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java
+++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java
@@ -59,9 +59,9 @@ public abstract class CompilerTestBase implements java.io.Serializable {
 	
 	protected transient DataStatistics dataStats;
 	
-	protected transient PactCompiler withStatsCompiler;
+	protected transient Optimizer withStatsCompiler;
 	
-	protected transient PactCompiler noStatsCompiler;
+	protected transient Optimizer noStatsCompiler;
 	
 	private transient int statCounter;
 	
@@ -70,10 +70,10 @@ public abstract class CompilerTestBase implements java.io.Serializable {
 	@Before
 	public void setup() {
 		this.dataStats = new DataStatistics();
-		this.withStatsCompiler = new PactCompiler(this.dataStats, new DefaultCostEstimator());
+		this.withStatsCompiler = new Optimizer(this.dataStats, new DefaultCostEstimator());
 		this.withStatsCompiler.setDefaultDegreeOfParallelism(DEFAULT_PARALLELISM);
 		
-		this.noStatsCompiler = new PactCompiler(null, new DefaultCostEstimator());
+		this.noStatsCompiler = new Optimizer(null, new DefaultCostEstimator());
 		this.noStatsCompiler.setDefaultDegreeOfParallelism(DEFAULT_PARALLELISM);
 	}
 	
@@ -111,7 +111,7 @@ public abstract class CompilerTestBase implements java.io.Serializable {
 			HashMap<String, ArrayList<PlanNode>> map = new HashMap<String, ArrayList<PlanNode>>();
 			
 			for (PlanNode n : p.getAllNodes()) {
-				Operator<?> c = n.getOriginalOptimizerNode().getPactContract();
+				Operator<?> c = n.getOriginalOptimizerNode().getOperator();
 				String name = c.getName();
 				
 				ArrayList<PlanNode> list = map.get(name);
@@ -124,7 +124,7 @@ public abstract class CompilerTestBase implements java.io.Serializable {
 				boolean shouldAdd = true;
 				for (Iterator<PlanNode> iter = list.iterator(); iter.hasNext();) {
 					PlanNode in = iter.next();
-					if (in.getOriginalOptimizerNode().getPactContract() == c) {
+					if (in.getOriginalOptimizerNode().getOperator() == c) {
 						// is this the child or is our node the child
 						if (in instanceof SingleInputPlanNode && n instanceof SingleInputPlanNode) {
 							SingleInputPlanNode thisNode = (SingleInputPlanNode) n;

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/DOPChangeTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/DOPChangeTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/DOPChangeTest.java
index 7fa331a..b17e777 100644
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/DOPChangeTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/DOPChangeTest.java
@@ -30,7 +30,7 @@ import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.PlanNode;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.optimizer.util.DummyInputFormat;
 import org.apache.flink.optimizer.util.DummyMatchStub;
 import org.apache.flink.optimizer.util.DummyOutputFormat;
@@ -317,7 +317,7 @@ public class DOPChangeTest extends CompilerTestBase {
 		
 		OptimizedPlan oPlan = compileNoStats(plan);
 		
-		NepheleJobGraphGenerator jobGen = new NepheleJobGraphGenerator();
+		JobGraphGenerator jobGen = new JobGraphGenerator();
 		
 		//Compile plan to verify that no error is thrown
 		jobGen.compileJobGraph(oPlan);

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java
index b8809d7..aaee975 100644
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java
@@ -23,7 +23,7 @@ import static org.junit.Assert.*;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.junit.Test;
 
 @SuppressWarnings("serial")
@@ -41,7 +41,7 @@ public class DisjointDataFlowsTest extends CompilerTestBase {
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
 			
-			new NepheleJobGraphGenerator().compileJobGraph(op);
+			new JobGraphGenerator().compileJobGraph(op);
 		}
 		catch (Exception e) {
 			e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
index d97f855..34aa9f8 100644
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
@@ -78,10 +78,10 @@ public class DistinctCompilationTest extends CompilerTestBase implements java.io
 			assertEquals(new FieldList(0, 1), reduceNode.getInput().getLocalStrategyKeys());
 
 			// check DOP
-			assertEquals(6, sourceNode.getDegreeOfParallelism());
-			assertEquals(6, combineNode.getDegreeOfParallelism());
-			assertEquals(8, reduceNode.getDegreeOfParallelism());
-			assertEquals(8, sinkNode.getDegreeOfParallelism());
+			assertEquals(6, sourceNode.getParallelism());
+			assertEquals(6, combineNode.getParallelism());
+			assertEquals(8, reduceNode.getParallelism());
+			assertEquals(8, sinkNode.getParallelism());
 		}
 		catch (Exception e) {
 			System.err.println(e.getMessage());
@@ -136,13 +136,13 @@ public class DistinctCompilationTest extends CompilerTestBase implements java.io
 			assertEquals(new FieldList(0), reduceNode.getInput().getLocalStrategyKeys());
 
 			// check DOP
-			assertEquals(6, sourceNode.getDegreeOfParallelism());
-			assertEquals(6, keyExtractor.getDegreeOfParallelism());
-			assertEquals(6, combineNode.getDegreeOfParallelism());
+			assertEquals(6, sourceNode.getParallelism());
+			assertEquals(6, keyExtractor.getParallelism());
+			assertEquals(6, combineNode.getParallelism());
 
-			assertEquals(8, reduceNode.getDegreeOfParallelism());
-			assertEquals(8, keyProjector.getDegreeOfParallelism());
-			assertEquals(8, sinkNode.getDegreeOfParallelism());
+			assertEquals(8, reduceNode.getParallelism());
+			assertEquals(8, keyProjector.getParallelism());
+			assertEquals(8, sinkNode.getParallelism());
 		}
 		catch (Exception e) {
 			System.err.println(e.getMessage());
@@ -192,10 +192,10 @@ public class DistinctCompilationTest extends CompilerTestBase implements java.io
 			assertEquals(new FieldList(1), reduceNode.getInput().getLocalStrategyKeys());
 
 			// check DOP
-			assertEquals(6, sourceNode.getDegreeOfParallelism());
-			assertEquals(6, combineNode.getDegreeOfParallelism());
-			assertEquals(8, reduceNode.getDegreeOfParallelism());
-			assertEquals(8, sinkNode.getDegreeOfParallelism());
+			assertEquals(6, sourceNode.getParallelism());
+			assertEquals(6, combineNode.getParallelism());
+			assertEquals(8, reduceNode.getParallelism());
+			assertEquals(8, sinkNode.getParallelism());
 		}
 		catch (Exception e) {
 			System.err.println(e.getMessage());

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/HardPlansCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/HardPlansCompilationTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/HardPlansCompilationTest.java
index 146c085..6dadc19 100644
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/HardPlansCompilationTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/HardPlansCompilationTest.java
@@ -25,7 +25,7 @@ import org.apache.flink.api.java.record.operators.FileDataSource;
 import org.apache.flink.api.java.record.operators.MapOperator;
 import org.apache.flink.api.java.record.operators.ReduceOperator;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.optimizer.util.DummyCrossStub;
 import org.apache.flink.optimizer.util.DummyInputFormat;
 import org.apache.flink.optimizer.util.DummyOutputFormat;
@@ -74,7 +74,7 @@ public class HardPlansCompilationTest extends CompilerTestBase {
 		plan.setDefaultParallelism(DEFAULT_PARALLELISM);
 		
 		OptimizedPlan oPlan = compileNoStats(plan);
-		NepheleJobGraphGenerator jobGen = new NepheleJobGraphGenerator();
+		JobGraphGenerator jobGen = new JobGraphGenerator();
 		jobGen.compileJobGraph(oPlan);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java
index dc6fcad..ac4f820 100644
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java
@@ -42,7 +42,7 @@ import org.apache.flink.optimizer.plan.Channel;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.optimizer.testfunctions.IdentityKeyExtractor;
 import org.apache.flink.optimizer.testfunctions.IdentityMapper;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
@@ -79,7 +79,7 @@ public class IterationsCompilerTest extends CompilerTestBase {
 			new PlanJSONDumpGenerator().getOptimizerPlanAsJSON(p);
 			
 			// check that the JobGraphGenerator accepts the plan
-			new NepheleJobGraphGenerator().compileJobGraph(p);
+			new JobGraphGenerator().compileJobGraph(p);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -116,7 +116,7 @@ public class IterationsCompilerTest extends CompilerTestBase {
 			assertEquals(ShipStrategyType.PARTITION_HASH, wipn.getInput1().getShipStrategy());
 			assertTrue(wipn.getInput2().getTempMode().breaksPipeline());
 			
-			new NepheleJobGraphGenerator().compileJobGraph(op);
+			new JobGraphGenerator().compileJobGraph(op);
 		}
 		catch (Exception e) {
 			System.err.println(e.getMessage());
@@ -152,7 +152,7 @@ public class IterationsCompilerTest extends CompilerTestBase {
 			assertEquals(ShipStrategyType.PARTITION_HASH, wipn.getInput1().getShipStrategy());
 			assertTrue(wipn.getInput2().getTempMode().breaksPipeline());
 			
-			new NepheleJobGraphGenerator().compileJobGraph(op);
+			new JobGraphGenerator().compileJobGraph(op);
 		}
 		catch (Exception e) {
 			System.err.println(e.getMessage());
@@ -188,7 +188,7 @@ public class IterationsCompilerTest extends CompilerTestBase {
 			assertEquals(ShipStrategyType.FORWARD, wipn.getInput1().getShipStrategy());
 			assertTrue(wipn.getInput2().getTempMode().breaksPipeline());
 			
-			new NepheleJobGraphGenerator().compileJobGraph(op);
+			new JobGraphGenerator().compileJobGraph(op);
 		}
 		catch (Exception e) {
 			System.err.println(e.getMessage());
@@ -222,7 +222,7 @@ public class IterationsCompilerTest extends CompilerTestBase {
 				assertEquals(ShipStrategyType.PARTITION_HASH, c.getShipStrategy());
 			}
 			
-			new NepheleJobGraphGenerator().compileJobGraph(op);
+			new JobGraphGenerator().compileJobGraph(op);
 		}
 		catch (Exception e) {
 			System.err.println(e.getMessage());
@@ -299,7 +299,7 @@ public class IterationsCompilerTest extends CompilerTestBase {
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
 			
-			new NepheleJobGraphGenerator().compileJobGraph(op);
+			new JobGraphGenerator().compileJobGraph(op);
 		}
 		catch (Exception e) {
 			e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/NestedIterationsTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/NestedIterationsTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/NestedIterationsTest.java
index d6852f5..e65758f 100644
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/NestedIterationsTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/NestedIterationsTest.java
@@ -27,7 +27,7 @@ import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.java.operators.IterativeDataSet;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.optimizer.testfunctions.DummyFlatJoinFunction;
 import org.apache.flink.optimizer.testfunctions.IdentityKeyExtractor;
 import org.apache.flink.optimizer.testfunctions.IdentityMapper;
@@ -133,7 +133,7 @@ public class NestedIterationsTest extends CompilerTestBase {
 			OptimizedPlan op = compileNoStats(p);
 			
 			// job graph generator should be able to translate this
-			new NepheleJobGraphGenerator().compileJobGraph(op);
+			new JobGraphGenerator().compileJobGraph(op);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -171,7 +171,7 @@ public class NestedIterationsTest extends CompilerTestBase {
 			OptimizedPlan op = compileNoStats(p);
 			
 			// job graph generator should be able to translate this
-			new NepheleJobGraphGenerator().compileJobGraph(op);
+			new JobGraphGenerator().compileJobGraph(op);
 		}
 		catch (Exception e) {
 			e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java
index e5983d9..86f01b0 100644
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java
@@ -149,7 +149,7 @@ public class PipelineBreakerTest extends CompilerTestBase {
 				DataSet<Long> initialSource = env.generateSequence(1, 10);
 				
 				Configuration conf= new Configuration();
-				conf.setString(PactCompiler.HINT_LOCAL_STRATEGY, PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_FIRST);
+				conf.setString(Optimizer.HINT_LOCAL_STRATEGY, Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_FIRST);
 				initialSource
 					.map(new IdentityMapper<Long>())
 					.cross(initialSource).withParameters(conf)
@@ -171,7 +171,7 @@ public class PipelineBreakerTest extends CompilerTestBase {
 				DataSet<Long> initialSource = env.generateSequence(1, 10);
 				
 				Configuration conf= new Configuration();
-				conf.setString(PactCompiler.HINT_LOCAL_STRATEGY, PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_SECOND);
+				conf.setString(Optimizer.HINT_LOCAL_STRATEGY, Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_SECOND);
 				initialSource
 					.map(new IdentityMapper<Long>())
 					.cross(initialSource).withParameters(conf)
@@ -194,7 +194,7 @@ public class PipelineBreakerTest extends CompilerTestBase {
 				DataSet<Long> initialSource = env.generateSequence(1, 10);
 				
 				Configuration conf= new Configuration();
-				conf.setString(PactCompiler.HINT_LOCAL_STRATEGY, PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_FIRST);
+				conf.setString(Optimizer.HINT_LOCAL_STRATEGY, Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_FIRST);
 				initialSource
 					.map(new IdentityMapper<Long>())
 					.cross(initialSource).withParameters(conf)
@@ -217,7 +217,7 @@ public class PipelineBreakerTest extends CompilerTestBase {
 				DataSet<Long> initialSource = env.generateSequence(1, 10);
 				
 				Configuration conf= new Configuration();
-				conf.setString(PactCompiler.HINT_LOCAL_STRATEGY, PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_SECOND);
+				conf.setString(Optimizer.HINT_LOCAL_STRATEGY, Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_SECOND);
 				initialSource
 					.map(new IdentityMapper<Long>())
 					.cross(initialSource).withParameters(conf)

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java
index 23fa311..f6885c5 100644
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java
@@ -25,7 +25,7 @@ import org.apache.flink.api.java.record.operators.FileDataSink;
 import org.apache.flink.api.java.record.operators.FileDataSource;
 import org.apache.flink.api.java.record.operators.ReduceOperator;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.optimizer.util.DummyInputFormat;
 import org.apache.flink.optimizer.util.DummyOutputFormat;
 import org.apache.flink.optimizer.util.IdentityReduce;
@@ -51,7 +51,7 @@ public class ReduceAllTest extends CompilerTestBase {
 		
 		try {
 			OptimizedPlan oPlan = compileNoStats(plan);
-			NepheleJobGraphGenerator jobGen = new NepheleJobGraphGenerator();
+			JobGraphGenerator jobGen = new JobGraphGenerator();
 			jobGen.compileJobGraph(oPlan);
 		} catch(CompilerException ce) {
 			ce.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java
index 8356e94..1fe16bb 100644
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java
@@ -63,7 +63,7 @@ public class SemanticPropertiesAPIToPlanTest extends CompilerTestBase {
 		oPlan.accept(new Visitor<PlanNode>() {
 			@Override
 			public boolean preVisit(PlanNode visitable) {
-				if (visitable instanceof SingleInputPlanNode && visitable.getPactContract() instanceof ReduceOperatorBase) {
+				if (visitable instanceof SingleInputPlanNode && visitable.getProgramOperator() instanceof ReduceOperatorBase) {
 					for (Channel input: visitable.getInputs()) {
 						GlobalProperties gprops = visitable.getGlobalProperties();
 						LocalProperties lprops = visitable.getLocalProperties();
@@ -78,7 +78,7 @@ public class SemanticPropertiesAPIToPlanTest extends CompilerTestBase {
 								lprops.getGroupedFields().contains(1));
 					}
 				}
-				if (visitable instanceof SingleInputPlanNode && visitable.getPactContract() instanceof MapOperatorBase) {
+				if (visitable instanceof SingleInputPlanNode && visitable.getProgramOperator() instanceof MapOperatorBase) {
 					for (Channel input: visitable.getInputs()) {
 						GlobalProperties gprops = visitable.getGlobalProperties();
 						LocalProperties lprops = visitable.getLocalProperties();
@@ -124,7 +124,7 @@ public class SemanticPropertiesAPIToPlanTest extends CompilerTestBase {
 		oPlan.accept(new Visitor<PlanNode>() {
 			@Override
 			public boolean preVisit(PlanNode visitable) {
-				if (visitable instanceof DualInputPlanNode && visitable.getPactContract() instanceof JoinOperatorBase) {
+				if (visitable instanceof DualInputPlanNode && visitable.getProgramOperator() instanceof JoinOperatorBase) {
 					DualInputPlanNode node = ((DualInputPlanNode) visitable);
 
 					final Channel inConn1 = node.getInput1();

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java
index 6647483..92b4fc5 100644
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java
@@ -30,7 +30,7 @@ import org.apache.flink.optimizer.plan.Channel;
 import org.apache.flink.optimizer.plan.NAryUnionPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.junit.Test;
 
 @SuppressWarnings("serial")
@@ -80,7 +80,7 @@ public class UnionBetweenDynamicAndStaticPathTest extends CompilerTestBase {
 			assertEquals(0.5, mixedUnion.getInput1().getRelativeTempMemory(), 0.0);
 			assertEquals(0.0, mixedUnion.getInput2().getRelativeTempMemory(), 0.0);
 			
-			new NepheleJobGraphGenerator().compileJobGraph(op);
+			new JobGraphGenerator().compileJobGraph(op);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -133,7 +133,7 @@ public class UnionBetweenDynamicAndStaticPathTest extends CompilerTestBase {
 				assertTrue(c.isOnDynamicPath());
 			}
 			
-			new NepheleJobGraphGenerator().compileJobGraph(op);
+			new JobGraphGenerator().compileJobGraph(op);
 		}
 		catch (Exception e) {
 			e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java
index cb4bce4..5d15ed8 100644
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java
@@ -43,7 +43,7 @@ import org.apache.flink.optimizer.plan.NAryUnionPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.PlanNode;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.optimizer.util.DummyInputFormat;
 import org.apache.flink.optimizer.util.DummyOutputFormat;
 import org.apache.flink.optimizer.util.IdentityReduce;
@@ -78,7 +78,7 @@ public class UnionPropertyPropagationTest extends CompilerTestBase {
 		
 		OptimizedPlan oPlan = compileNoStats(plan);
 		
-		NepheleJobGraphGenerator jobGen = new NepheleJobGraphGenerator();
+		JobGraphGenerator jobGen = new JobGraphGenerator();
 		
 		// Compile plan to verify that no error is thrown
 		jobGen.compileJobGraph(oPlan);
@@ -87,7 +87,7 @@ public class UnionPropertyPropagationTest extends CompilerTestBase {
 			
 			@Override
 			public boolean preVisit(PlanNode visitable) {
-				if (visitable instanceof SingleInputPlanNode && visitable.getPactContract() instanceof ReduceOperator) {
+				if (visitable instanceof SingleInputPlanNode && visitable.getProgramOperator() instanceof ReduceOperator) {
 					for (Channel inConn : visitable.getInputs()) {
 						Assert.assertTrue("Reduce should just forward the input if it is already partitioned",
 								inConn.getShipStrategy() == ShipStrategyType.FORWARD); 
@@ -126,7 +126,7 @@ public class UnionPropertyPropagationTest extends CompilerTestBase {
 		// return the plan
 		Plan plan = env.createProgramPlan("Test union on new java-api");
 		OptimizedPlan oPlan = compileNoStats(plan);
-		NepheleJobGraphGenerator jobGen = new NepheleJobGraphGenerator();
+		JobGraphGenerator jobGen = new JobGraphGenerator();
 		
 		// Compile plan to verify that no error is thrown
 		jobGen.compileJobGraph(oPlan);
@@ -139,7 +139,7 @@ public class UnionPropertyPropagationTest extends CompilerTestBase {
 				/* Test on the union output connections
 				 * It must be under the GroupOperator and the strategy should be forward
 				 */
-				if (visitable instanceof SingleInputPlanNode && visitable.getPactContract() instanceof GroupReduceOperatorBase){
+				if (visitable instanceof SingleInputPlanNode && visitable.getProgramOperator() instanceof GroupReduceOperatorBase){
 					final Channel inConn = ((SingleInputPlanNode) visitable).getInput();
 					Assert.assertTrue("Union should just forward the Partitioning",
 							inConn.getShipStrategy() == ShipStrategyType.FORWARD ); 
@@ -156,7 +156,7 @@ public class UnionPropertyPropagationTest extends CompilerTestBase {
 						final Channel inConn = inputs.next();
 						PlanNode inNode = inConn.getSource();
 						Assert.assertTrue("Input of Union should be FlatMapOperators",
-								inNode.getPactContract() instanceof FlatMapOperatorBase);
+								inNode.getProgramOperator() instanceof FlatMapOperatorBase);
 						Assert.assertTrue("Shipment strategy under union should partition the data",
 								inConn.getShipStrategy() == ShipStrategyType.PARTITION_HASH); 
 					}

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java
index f327259..1e4124c 100644
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.junit.Test;
 
 import static org.junit.Assert.fail;
@@ -44,7 +44,7 @@ public class UnionReplacementTest extends CompilerTestBase {
 	
 			Plan plan = env.createProgramPlan();
 			OptimizedPlan oPlan = compileNoStats(plan);
-			NepheleJobGraphGenerator jobGen = new NepheleJobGraphGenerator();
+			JobGraphGenerator jobGen = new JobGraphGenerator();
 			jobGen.compileJobGraph(oPlan);
 		}
 		catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java
index d10803e..80c0bda 100644
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java
@@ -28,7 +28,7 @@ import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.junit.Test;
 
 @SuppressWarnings("serial")
@@ -52,7 +52,7 @@ public class WorksetIterationCornerCasesTest extends CompilerTestBase {
 			WorksetIterationPlanNode wipn = (WorksetIterationPlanNode) op.getDataSinks().iterator().next().getInput().getSource();
 			assertTrue(wipn.getSolutionSetPlanNode().getOutgoingChannels().isEmpty());
 			
-			NepheleJobGraphGenerator jgg = new NepheleJobGraphGenerator();
+			JobGraphGenerator jgg = new JobGraphGenerator();
 			jgg.compileJobGraph(op);
 		}
 		catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java
index f17842e..6e7c0a3 100644
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java
@@ -34,7 +34,7 @@ import org.apache.flink.api.java.record.operators.ReduceOperator;
 import org.apache.flink.optimizer.plan.DualInputPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.optimizer.util.DummyInputFormat;
 import org.apache.flink.optimizer.util.DummyMatchStub;
 import org.apache.flink.optimizer.util.DummyNonPreservingMatchStub;
@@ -106,7 +106,7 @@ public class WorksetIterationsRecordApiCompilerTest extends CompilerTestBase {
 		assertTrue( (ss1 == ShipStrategyType.FORWARD && ss2 == ShipStrategyType.PARTITION_HASH) ||
 					(ss2 == ShipStrategyType.FORWARD && ss1 == ShipStrategyType.PARTITION_HASH) );
 		
-		new NepheleJobGraphGenerator().compileJobGraph(oPlan);
+		new JobGraphGenerator().compileJobGraph(oPlan);
 	}
 	
 	@Test
@@ -150,7 +150,7 @@ public class WorksetIterationsRecordApiCompilerTest extends CompilerTestBase {
 		assertEquals(ShipStrategyType.PARTITION_HASH, joinWithSolutionSetNode.getOutgoingChannels().get(0).getShipStrategy());
 		assertEquals(ShipStrategyType.PARTITION_HASH, joinWithSolutionSetNode.getOutgoingChannels().get(1).getShipStrategy());
 		
-		new NepheleJobGraphGenerator().compileJobGraph(oPlan);
+		new JobGraphGenerator().compileJobGraph(oPlan);
 	}
 	
 	@Test
@@ -193,7 +193,7 @@ public class WorksetIterationsRecordApiCompilerTest extends CompilerTestBase {
 		assertEquals(1, joinWithSolutionSetNode.getOutgoingChannels().size());
 		assertEquals(ShipStrategyType.FORWARD, joinWithSolutionSetNode.getOutgoingChannels().get(0).getShipStrategy());
 		
-		new NepheleJobGraphGenerator().compileJobGraph(oPlan);
+		new JobGraphGenerator().compileJobGraph(oPlan);
 	}
 	
 	private Plan getRecordTestPlan(boolean joinPreservesSolutionSet, boolean mapBeforeSolutionDelta) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/BinaryCustomPartitioningCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/BinaryCustomPartitioningCompatibilityTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/BinaryCustomPartitioningCompatibilityTest.java
index e876fbb..0273659 100644
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/BinaryCustomPartitioningCompatibilityTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/BinaryCustomPartitioningCompatibilityTest.java
@@ -31,7 +31,7 @@ import org.apache.flink.optimizer.plan.DualInputPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.optimizer.testfunctions.DummyCoGroupFunction;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.junit.Test;
@@ -75,7 +75,7 @@ public class BinaryCustomPartitioningCompatibilityTest extends CompilerTestBase
 			assertEquals(partitioner, partitioner1.getInput().getPartitioner());
 			assertEquals(partitioner, partitioner2.getInput().getPartitioner());
 			
-			new NepheleJobGraphGenerator().compileJobGraph(op);
+			new JobGraphGenerator().compileJobGraph(op);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -120,7 +120,7 @@ public class BinaryCustomPartitioningCompatibilityTest extends CompilerTestBase
 			assertEquals(partitioner, partitioner1.getInput().getPartitioner());
 			assertEquals(partitioner, partitioner2.getInput().getPartitioner());
 			
-			new NepheleJobGraphGenerator().compileJobGraph(op);
+			new JobGraphGenerator().compileJobGraph(op);
 		}
 		catch (Exception e) {
 			e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java
index f6d8d0e..d397ea2 100644
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java
@@ -64,17 +64,17 @@ public class CustomPartitioningTest extends CompilerTestBase {
 			SingleInputPlanNode balancer = (SingleInputPlanNode) partitioner.getInput().getSource();
 			
 			assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
-			assertEquals(parallelism, sink.getDegreeOfParallelism());
+			assertEquals(parallelism, sink.getParallelism());
 			
 			assertEquals(ShipStrategyType.FORWARD, mapper.getInput().getShipStrategy());
-			assertEquals(parallelism, mapper.getDegreeOfParallelism());
+			assertEquals(parallelism, mapper.getParallelism());
 			
 			assertEquals(ShipStrategyType.PARTITION_CUSTOM, partitioner.getInput().getShipStrategy());
 			assertEquals(part, partitioner.getInput().getPartitioner());
-			assertEquals(parallelism, partitioner.getDegreeOfParallelism());
+			assertEquals(parallelism, partitioner.getParallelism());
 			
 			assertEquals(ShipStrategyType.PARTITION_FORCED_REBALANCE, balancer.getInput().getShipStrategy());
-			assertEquals(parallelism, balancer.getDegreeOfParallelism());
+			assertEquals(parallelism, balancer.getParallelism());
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -134,17 +134,17 @@ public class CustomPartitioningTest extends CompilerTestBase {
 			SingleInputPlanNode balancer = (SingleInputPlanNode) partitioner.getInput().getSource();
 			
 			assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
-			assertEquals(parallelism, sink.getDegreeOfParallelism());
+			assertEquals(parallelism, sink.getParallelism());
 			
 			assertEquals(ShipStrategyType.FORWARD, mapper.getInput().getShipStrategy());
-			assertEquals(parallelism, mapper.getDegreeOfParallelism());
+			assertEquals(parallelism, mapper.getParallelism());
 			
 			assertEquals(ShipStrategyType.PARTITION_CUSTOM, partitioner.getInput().getShipStrategy());
 			assertEquals(part, partitioner.getInput().getPartitioner());
-			assertEquals(parallelism, partitioner.getDegreeOfParallelism());
+			assertEquals(parallelism, partitioner.getParallelism());
 			
 			assertEquals(ShipStrategyType.PARTITION_FORCED_REBALANCE, balancer.getInput().getShipStrategy());
-			assertEquals(parallelism, balancer.getDegreeOfParallelism());
+			assertEquals(parallelism, balancer.getParallelism());
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -206,23 +206,23 @@ public class CustomPartitioningTest extends CompilerTestBase {
 			SingleInputPlanNode balancer = (SingleInputPlanNode) keyExtractor.getInput().getSource();
 			
 			assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
-			assertEquals(parallelism, sink.getDegreeOfParallelism());
+			assertEquals(parallelism, sink.getParallelism());
 			
 			assertEquals(ShipStrategyType.FORWARD, mapper.getInput().getShipStrategy());
-			assertEquals(parallelism, mapper.getDegreeOfParallelism());
+			assertEquals(parallelism, mapper.getParallelism());
 			
 			assertEquals(ShipStrategyType.FORWARD, keyRemover.getInput().getShipStrategy());
-			assertEquals(parallelism, keyRemover.getDegreeOfParallelism());
+			assertEquals(parallelism, keyRemover.getParallelism());
 			
 			assertEquals(ShipStrategyType.PARTITION_CUSTOM, partitioner.getInput().getShipStrategy());
 			assertEquals(part, partitioner.getInput().getPartitioner());
-			assertEquals(parallelism, partitioner.getDegreeOfParallelism());
+			assertEquals(parallelism, partitioner.getParallelism());
 			
 			assertEquals(ShipStrategyType.FORWARD, keyExtractor.getInput().getShipStrategy());
-			assertEquals(parallelism, keyExtractor.getDegreeOfParallelism());
+			assertEquals(parallelism, keyExtractor.getParallelism());
 			
 			assertEquals(ShipStrategyType.PARTITION_FORCED_REBALANCE, balancer.getInput().getShipStrategy());
-			assertEquals(parallelism, balancer.getDegreeOfParallelism());
+			assertEquals(parallelism, balancer.getParallelism());
 		}
 		catch (Exception e) {
 			e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeClosedBranchingTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeClosedBranchingTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeClosedBranchingTest.java
index 2c5d235..cb4bd78 100644
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeClosedBranchingTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeClosedBranchingTest.java
@@ -246,7 +246,7 @@ public class DataExchangeModeClosedBranchingTest extends CompilerTestBase {
 
 	private SinkPlanNode findSink(Collection<SinkPlanNode> collection, String name) {
 		for (SinkPlanNode node : collection) {
-			String nodeName = node.getOptimizerNode().getPactContract().getName();
+			String nodeName = node.getOptimizerNode().getOperator().getName();
 			if (nodeName != null && nodeName.equals(name)) {
 				return node;
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeOpenBranchingTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeOpenBranchingTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeOpenBranchingTest.java
index 6c0e88b..6b2691a 100644
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeOpenBranchingTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeOpenBranchingTest.java
@@ -171,7 +171,7 @@ public class DataExchangeModeOpenBranchingTest extends CompilerTestBase {
 
 	private SinkPlanNode findSink(Collection<SinkPlanNode> collection, String name) {
 		for (SinkPlanNode node : collection) {
-			String nodeName = node.getOptimizerNode().getPactContract().getName();
+			String nodeName = node.getOptimizerNode().getOperator().getName();
 			if (nodeName != null && nodeName.equals(name)) {
 				return node;
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/PipelineBreakingTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/PipelineBreakingTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/PipelineBreakingTest.java
index dae3c41..fe33635 100644
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/PipelineBreakingTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/PipelineBreakingTest.java
@@ -25,7 +25,7 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.optimizer.PactCompiler;
+import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.dag.DataSinkNode;
 import org.apache.flink.optimizer.dag.OptimizerNode;
 import org.apache.flink.optimizer.dag.SingleInputNode;
@@ -292,8 +292,8 @@ public class PipelineBreakingTest {
 	}
 
 	private static List<DataSinkNode> convertPlan(Plan p) {
-		PactCompiler.GraphCreatingVisitor dagCreator =
-				new PactCompiler.GraphCreatingVisitor(17, p.getExecutionConfig().getExecutionMode());
+		Optimizer.GraphCreatingVisitor dagCreator =
+				new Optimizer.GraphCreatingVisitor(17, p.getExecutionConfig().getExecutionMode());
 
 		// create the DAG
 		p.accept(dagCreator);
@@ -312,8 +312,8 @@ public class PipelineBreakingTest {
 				rootNode = new SinkJoiner(rootNode, iter.next());
 			}
 		}
-		rootNode.accept(new PactCompiler.IdAndEstimatesVisitor(null));
-		rootNode.accept(new PactCompiler.BranchesVisitor());
+		rootNode.accept(new Optimizer.IdAndEstimatesVisitor(null));
+		rootNode.accept(new Optimizer.BranchesVisitor());
 
 		return sinks;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
index 32aeab9..a683968 100644
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
@@ -73,9 +73,9 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
 			assertEquals(DriverStrategy.ALL_GROUP_REDUCE, reduceNode.getDriverStrategy());
 			
 			// check DOP
-			assertEquals(1, sourceNode.getDegreeOfParallelism());
-			assertEquals(1, reduceNode.getDegreeOfParallelism());
-			assertEquals(1, sinkNode.getDegreeOfParallelism());
+			assertEquals(1, sourceNode.getParallelism());
+			assertEquals(1, reduceNode.getParallelism());
+			assertEquals(1, sinkNode.getParallelism());
 		}
 		catch (Exception e) {
 			System.err.println(e.getMessage());
@@ -121,10 +121,10 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
 			assertEquals(DriverStrategy.ALL_GROUP_REDUCE_COMBINE, combineNode.getDriverStrategy());
 			
 			// check DOP
-			assertEquals(8, sourceNode.getDegreeOfParallelism());
-			assertEquals(8, combineNode.getDegreeOfParallelism());
-			assertEquals(1, reduceNode.getDegreeOfParallelism());
-			assertEquals(1, sinkNode.getDegreeOfParallelism());
+			assertEquals(8, sourceNode.getParallelism());
+			assertEquals(8, combineNode.getParallelism());
+			assertEquals(1, reduceNode.getParallelism());
+			assertEquals(1, sinkNode.getParallelism());
 		}
 		catch (Exception e) {
 			System.err.println(e.getMessage());
@@ -172,9 +172,9 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
 			assertEquals(new FieldList(1), reduceNode.getInput().getLocalStrategyKeys());
 			
 			// check DOP
-			assertEquals(6, sourceNode.getDegreeOfParallelism());
-			assertEquals(8, reduceNode.getDegreeOfParallelism());
-			assertEquals(8, sinkNode.getDegreeOfParallelism());
+			assertEquals(6, sourceNode.getParallelism());
+			assertEquals(8, reduceNode.getParallelism());
+			assertEquals(8, sinkNode.getParallelism());
 		}
 		catch (Exception e) {
 			System.err.println(e.getMessage());
@@ -229,10 +229,10 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
 			assertEquals(new FieldList(1), reduceNode.getInput().getLocalStrategyKeys());
 			
 			// check DOP
-			assertEquals(6, sourceNode.getDegreeOfParallelism());
-			assertEquals(6, combineNode.getDegreeOfParallelism());
-			assertEquals(8, reduceNode.getDegreeOfParallelism());
-			assertEquals(8, sinkNode.getDegreeOfParallelism());
+			assertEquals(6, sourceNode.getParallelism());
+			assertEquals(6, combineNode.getParallelism());
+			assertEquals(8, reduceNode.getParallelism());
+			assertEquals(8, sinkNode.getParallelism());
 		}
 		catch (Exception e) {
 			System.err.println(e.getMessage());
@@ -285,12 +285,12 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
 			assertEquals(new FieldList(0), reduceNode.getInput().getLocalStrategyKeys());
 			
 			// check DOP
-			assertEquals(6, sourceNode.getDegreeOfParallelism());
-			assertEquals(6, keyExtractor.getDegreeOfParallelism());
+			assertEquals(6, sourceNode.getParallelism());
+			assertEquals(6, keyExtractor.getParallelism());
 			
-			assertEquals(8, reduceNode.getDegreeOfParallelism());
-			assertEquals(8, keyProjector.getDegreeOfParallelism());
-			assertEquals(8, sinkNode.getDegreeOfParallelism());
+			assertEquals(8, reduceNode.getParallelism());
+			assertEquals(8, keyProjector.getParallelism());
+			assertEquals(8, sinkNode.getParallelism());
 		}
 		catch (Exception e) {
 			System.err.println(e.getMessage());
@@ -351,13 +351,13 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
 			assertEquals(new FieldList(0), reduceNode.getInput().getLocalStrategyKeys());
 			
 			// check DOP
-			assertEquals(6, sourceNode.getDegreeOfParallelism());
-			assertEquals(6, keyExtractor.getDegreeOfParallelism());
-			assertEquals(6, combineNode.getDegreeOfParallelism());
+			assertEquals(6, sourceNode.getParallelism());
+			assertEquals(6, keyExtractor.getParallelism());
+			assertEquals(6, combineNode.getParallelism());
 			
-			assertEquals(8, reduceNode.getDegreeOfParallelism());
-			assertEquals(8, keyProjector.getDegreeOfParallelism());
-			assertEquals(8, sinkNode.getDegreeOfParallelism());
+			assertEquals(8, reduceNode.getParallelism());
+			assertEquals(8, keyProjector.getParallelism());
+			assertEquals(8, sinkNode.getParallelism());
 		}
 		catch (Exception e) {
 			System.err.println(e.getMessage());

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java
index 54b24dd..37a8e81 100644
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java
@@ -34,7 +34,7 @@ import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
 import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.optimizer.testfunctions.IdentityMapper;
 import org.junit.Test;
 
@@ -53,7 +53,7 @@ public class IterationCompilerTest extends CompilerTestBase {
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
 			
-			new NepheleJobGraphGenerator().compileJobGraph(op);
+			new JobGraphGenerator().compileJobGraph(op);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -81,7 +81,7 @@ public class IterationCompilerTest extends CompilerTestBase {
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
 			
-			new NepheleJobGraphGenerator().compileJobGraph(op);
+			new JobGraphGenerator().compileJobGraph(op);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -120,7 +120,7 @@ public class IterationCompilerTest extends CompilerTestBase {
 			assertTrue(union.getCostWeight() >= 1);
 			
 			// see that the jobgraph generator can translate this
-			new NepheleJobGraphGenerator().compileJobGraph(op);
+			new JobGraphGenerator().compileJobGraph(op);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -179,7 +179,7 @@ public class IterationCompilerTest extends CompilerTestBase {
 			assertTrue(solutionDeltaUnion.isOnDynamicPath());
 			assertTrue(solutionDeltaUnion.getCostWeight() >= 1);
 			
-			new NepheleJobGraphGenerator().compileJobGraph(op);
+			new JobGraphGenerator().compileJobGraph(op);
 		}
 		catch (Exception e) {
 			e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
index bbdad4a..0724a9f 100644
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
@@ -72,9 +72,9 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S
 			assertEquals(reduceNode, sinkNode.getInput().getSource());
 			
 			// check DOP
-			assertEquals(1, sourceNode.getDegreeOfParallelism());
-			assertEquals(1, reduceNode.getDegreeOfParallelism());
-			assertEquals(1, sinkNode.getDegreeOfParallelism());
+			assertEquals(1, sourceNode.getParallelism());
+			assertEquals(1, reduceNode.getParallelism());
+			assertEquals(1, sinkNode.getParallelism());
 		}
 		catch (Exception e) {
 			System.err.println(e.getMessage());
@@ -122,10 +122,10 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S
 			assertEquals(DriverStrategy.ALL_REDUCE, combineNode.getDriverStrategy());
 			
 			// check DOP
-			assertEquals(8, sourceNode.getDegreeOfParallelism());
-			assertEquals(8, combineNode.getDegreeOfParallelism());
-			assertEquals(1, reduceNode.getDegreeOfParallelism());
-			assertEquals(1, sinkNode.getDegreeOfParallelism());
+			assertEquals(8, sourceNode.getParallelism());
+			assertEquals(8, combineNode.getParallelism());
+			assertEquals(1, reduceNode.getParallelism());
+			assertEquals(1, sinkNode.getParallelism());
 		}
 		catch (Exception e) {
 			System.err.println(e.getMessage());
@@ -180,10 +180,10 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S
 			assertEquals(new FieldList(1), reduceNode.getInput().getLocalStrategyKeys());
 			
 			// check DOP
-			assertEquals(6, sourceNode.getDegreeOfParallelism());
-			assertEquals(6, combineNode.getDegreeOfParallelism());
-			assertEquals(8, reduceNode.getDegreeOfParallelism());
-			assertEquals(8, sinkNode.getDegreeOfParallelism());
+			assertEquals(6, sourceNode.getParallelism());
+			assertEquals(6, combineNode.getParallelism());
+			assertEquals(8, reduceNode.getParallelism());
+			assertEquals(8, sinkNode.getParallelism());
 		}
 		catch (Exception e) {
 			System.err.println(e.getMessage());
@@ -244,13 +244,13 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S
 			assertEquals(new FieldList(0), reduceNode.getInput().getLocalStrategyKeys());
 			
 			// check DOP
-			assertEquals(6, sourceNode.getDegreeOfParallelism());
-			assertEquals(6, keyExtractor.getDegreeOfParallelism());
-			assertEquals(6, combineNode.getDegreeOfParallelism());
+			assertEquals(6, sourceNode.getParallelism());
+			assertEquals(6, keyExtractor.getParallelism());
+			assertEquals(6, combineNode.getParallelism());
 			
-			assertEquals(8, reduceNode.getDegreeOfParallelism());
-			assertEquals(8, keyProjector.getDegreeOfParallelism());
-			assertEquals(8, sinkNode.getDegreeOfParallelism());
+			assertEquals(8, reduceNode.getParallelism());
+			assertEquals(8, keyProjector.getParallelism());
+			assertEquals(8, sinkNode.getParallelism());
 		}
 		catch (Exception e) {
 			System.err.println(e.getMessage());

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java
index 658cf7a..8720aa7 100644
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java
@@ -37,7 +37,7 @@ import org.apache.flink.optimizer.CompilerTestBase;
 import org.apache.flink.optimizer.plan.DualInputPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
@@ -93,7 +93,7 @@ public class WorksetIterationsJavaApiCompilerTest extends CompilerTestBase {
 			assertTrue( (ss1 == ShipStrategyType.FORWARD && ss2 == ShipStrategyType.PARTITION_HASH) ||
 						(ss2 == ShipStrategyType.FORWARD && ss1 == ShipStrategyType.PARTITION_HASH) );
 		
-			new NepheleJobGraphGenerator().compileJobGraph(oPlan);
+			new JobGraphGenerator().compileJobGraph(oPlan);
 		}
 		catch (Exception e) {
 			System.err.println(e.getMessage());
@@ -137,7 +137,7 @@ public class WorksetIterationsJavaApiCompilerTest extends CompilerTestBase {
 			assertEquals(ShipStrategyType.PARTITION_HASH, joinWithSolutionSetNode.getOutgoingChannels().get(0).getShipStrategy());
 			assertEquals(ShipStrategyType.PARTITION_HASH, joinWithSolutionSetNode.getOutgoingChannels().get(1).getShipStrategy());
 			
-			new NepheleJobGraphGenerator().compileJobGraph(oPlan);
+			new JobGraphGenerator().compileJobGraph(oPlan);
 		}
 		catch (Exception e) {
 			System.err.println(e.getMessage());
@@ -182,7 +182,7 @@ public class WorksetIterationsJavaApiCompilerTest extends CompilerTestBase {
 			assertEquals(1, joinWithSolutionSetNode.getOutgoingChannels().size());
 			assertEquals(ShipStrategyType.FORWARD, joinWithSolutionSetNode.getOutgoingChannels().get(0).getShipStrategy());
 			
-			new NepheleJobGraphGenerator().compileJobGraph(oPlan);
+			new JobGraphGenerator().compileJobGraph(oPlan);
 		}
 		catch (Exception e) {
 			System.err.println(e.getMessage());

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/CoGroupOnConflictingPartitioningsTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/CoGroupOnConflictingPartitioningsTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/CoGroupOnConflictingPartitioningsTest.java
index 21d95ee..e7807c9 100644
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/CoGroupOnConflictingPartitioningsTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/CoGroupOnConflictingPartitioningsTest.java
@@ -26,7 +26,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.optimizer.CompilerException;
 import org.apache.flink.optimizer.CompilerTestBase;
-import org.apache.flink.optimizer.PactCompiler;
+import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.testfunctions.DummyCoGroupFunction;
 import org.apache.flink.configuration.Configuration;
 import org.junit.Test;
@@ -42,8 +42,8 @@ public class CoGroupOnConflictingPartitioningsTest extends CompilerTestBase {
 			DataSet<Tuple2<Long, Long>> input = env.fromElements(new Tuple2<Long, Long>(0L, 0L));
 			
 			Configuration cfg = new Configuration();
-			cfg.setString(PactCompiler.HINT_SHIP_STRATEGY_FIRST_INPUT, PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_HASH);
-			cfg.setString(PactCompiler.HINT_SHIP_STRATEGY_SECOND_INPUT, PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_RANGE);
+			cfg.setString(Optimizer.HINT_SHIP_STRATEGY_FIRST_INPUT, Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH);
+			cfg.setString(Optimizer.HINT_SHIP_STRATEGY_SECOND_INPUT, Optimizer.HINT_SHIP_STRATEGY_REPARTITION_RANGE);
 			
 			input.coGroup(input).where(0).equalTo(0)
 				.with(new DummyCoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>())

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/JoinOnConflictingPartitioningsTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/JoinOnConflictingPartitioningsTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/JoinOnConflictingPartitioningsTest.java
index 4cfa189..9171cc7 100644
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/JoinOnConflictingPartitioningsTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/JoinOnConflictingPartitioningsTest.java
@@ -26,7 +26,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.optimizer.CompilerException;
 import org.apache.flink.optimizer.CompilerTestBase;
-import org.apache.flink.optimizer.PactCompiler;
+import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.configuration.Configuration;
 import org.junit.Test;
 
@@ -41,8 +41,8 @@ public class JoinOnConflictingPartitioningsTest extends CompilerTestBase {
 			DataSet<Tuple2<Long, Long>> input = env.fromElements(new Tuple2<Long, Long>(0L, 0L));
 			
 			Configuration cfg = new Configuration();
-			cfg.setString(PactCompiler.HINT_SHIP_STRATEGY_FIRST_INPUT, PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_HASH);
-			cfg.setString(PactCompiler.HINT_SHIP_STRATEGY_SECOND_INPUT, PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_RANGE);
+			cfg.setString(Optimizer.HINT_SHIP_STRATEGY_FIRST_INPUT, Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH);
+			cfg.setString(Optimizer.HINT_SHIP_STRATEGY_SECOND_INPUT, Optimizer.HINT_SHIP_STRATEGY_REPARTITION_RANGE);
 			
 			input.join(input).where(0).equalTo(0)
 				.withParameters(cfg)

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java b/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java
index 79e04fb..019345f 100644
--- a/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java
+++ b/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java
@@ -68,24 +68,24 @@ public class SpargelCompilerTest extends CompilerTestBase {
 			// check the sink
 			SinkPlanNode sink = op.getDataSinks().iterator().next();
 			assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
-			assertEquals(DEFAULT_PARALLELISM, sink.getDegreeOfParallelism());
+			assertEquals(DEFAULT_PARALLELISM, sink.getParallelism());
 			
 			// check the iteration
 			WorksetIterationPlanNode iteration = (WorksetIterationPlanNode) sink.getInput().getSource();
-			assertEquals(DEFAULT_PARALLELISM, iteration.getDegreeOfParallelism());
+			assertEquals(DEFAULT_PARALLELISM, iteration.getParallelism());
 			
 			// check the solution set join and the delta
 			PlanNode ssDelta = iteration.getSolutionSetDeltaPlanNode();
 			assertTrue(ssDelta instanceof DualInputPlanNode); // this is only true if the update functions preserves the partitioning
 			
 			DualInputPlanNode ssJoin = (DualInputPlanNode) ssDelta;
-			assertEquals(DEFAULT_PARALLELISM, ssJoin.getDegreeOfParallelism());
+			assertEquals(DEFAULT_PARALLELISM, ssJoin.getParallelism());
 			assertEquals(ShipStrategyType.PARTITION_HASH, ssJoin.getInput1().getShipStrategy());
 			assertEquals(new FieldList(0), ssJoin.getInput1().getShipStrategyKeys());
 			
 			// check the workset set join
 			DualInputPlanNode edgeJoin = (DualInputPlanNode) ssJoin.getInput1().getSource();
-			assertEquals(DEFAULT_PARALLELISM, edgeJoin.getDegreeOfParallelism());
+			assertEquals(DEFAULT_PARALLELISM, edgeJoin.getParallelism());
 			assertEquals(ShipStrategyType.PARTITION_HASH, edgeJoin.getInput1().getShipStrategy());
 			assertEquals(ShipStrategyType.FORWARD, edgeJoin.getInput2().getShipStrategy());
 			assertTrue(edgeJoin.getInput1().getTempMode().isCached());
@@ -143,24 +143,24 @@ public class SpargelCompilerTest extends CompilerTestBase {
 			// check the sink
 			SinkPlanNode sink = op.getDataSinks().iterator().next();
 			assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
-			assertEquals(DEFAULT_PARALLELISM, sink.getDegreeOfParallelism());
+			assertEquals(DEFAULT_PARALLELISM, sink.getParallelism());
 			
 			// check the iteration
 			WorksetIterationPlanNode iteration = (WorksetIterationPlanNode) sink.getInput().getSource();
-			assertEquals(DEFAULT_PARALLELISM, iteration.getDegreeOfParallelism());
+			assertEquals(DEFAULT_PARALLELISM, iteration.getParallelism());
 			
 			// check the solution set join and the delta
 			PlanNode ssDelta = iteration.getSolutionSetDeltaPlanNode();
 			assertTrue(ssDelta instanceof DualInputPlanNode); // this is only true if the update functions preserves the partitioning
 			
 			DualInputPlanNode ssJoin = (DualInputPlanNode) ssDelta;
-			assertEquals(DEFAULT_PARALLELISM, ssJoin.getDegreeOfParallelism());
+			assertEquals(DEFAULT_PARALLELISM, ssJoin.getParallelism());
 			assertEquals(ShipStrategyType.PARTITION_HASH, ssJoin.getInput1().getShipStrategy());
 			assertEquals(new FieldList(0), ssJoin.getInput1().getShipStrategyKeys());
 			
 			// check the workset set join
 			DualInputPlanNode edgeJoin = (DualInputPlanNode) ssJoin.getInput1().getSource();
-			assertEquals(DEFAULT_PARALLELISM, edgeJoin.getDegreeOfParallelism());
+			assertEquals(DEFAULT_PARALLELISM, edgeJoin.getParallelism());
 			assertEquals(ShipStrategyType.PARTITION_HASH, edgeJoin.getInput1().getShipStrategy());
 			assertEquals(ShipStrategyType.FORWARD, edgeJoin.getInput2().getShipStrategy());
 			assertTrue(edgeJoin.getInput1().getTempMode().isCached());

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/CompilerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/CompilerTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/CompilerTestBase.java
index acf20d3..788327a 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/CompilerTestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/CompilerTestBase.java
@@ -30,7 +30,7 @@ import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics;
 import org.apache.flink.api.common.operators.GenericDataSourceBase;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.PactCompiler;
+import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.costs.DefaultCostEstimator;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.PlanNode;
@@ -57,9 +57,9 @@ public abstract class CompilerTestBase {
 	
 	protected DataStatistics dataStats;
 	
-	protected PactCompiler withStatsCompiler;
+	protected Optimizer withStatsCompiler;
 	
-	protected PactCompiler noStatsCompiler;
+	protected Optimizer noStatsCompiler;
 	
 	private int statCounter;
 	
@@ -68,10 +68,10 @@ public abstract class CompilerTestBase {
 	@Before
 	public void setup() {
 		this.dataStats = new DataStatistics();
-		this.withStatsCompiler = new PactCompiler(this.dataStats, new DefaultCostEstimator());
+		this.withStatsCompiler = new Optimizer(this.dataStats, new DefaultCostEstimator());
 		this.withStatsCompiler.setDefaultDegreeOfParallelism(DEFAULT_PARALLELISM);
 		
-		this.noStatsCompiler = new PactCompiler(null, new DefaultCostEstimator());
+		this.noStatsCompiler = new Optimizer(null, new DefaultCostEstimator());
 		this.noStatsCompiler.setDefaultDegreeOfParallelism(DEFAULT_PARALLELISM);
 	}
 	
@@ -113,7 +113,7 @@ public abstract class CompilerTestBase {
 			HashMap<String, ArrayList<PlanNode>> map = new HashMap<String, ArrayList<PlanNode>>();
 			
 			for (PlanNode n : p.getAllNodes()) {
-				Operator<?> c = n.getOriginalOptimizerNode().getPactContract();
+				Operator<?> c = n.getOriginalOptimizerNode().getOperator();
 				String name = c.getName();
 				
 				ArrayList<PlanNode> list = map.get(name);
@@ -126,7 +126,7 @@ public abstract class CompilerTestBase {
 				boolean shouldAdd = true;
 				for (Iterator<PlanNode> iter = list.iterator(); iter.hasNext();) {
 					PlanNode in = iter.next();
-					if (in.getOriginalOptimizerNode().getPactContract() == c) {
+					if (in.getOriginalOptimizerNode().getOperator() == c) {
 						// is this the child or is our node the child
 						if (in instanceof SingleInputPlanNode && n instanceof SingleInputPlanNode) {
 							SingleInputPlanNode thisNode = (SingleInputPlanNode) n;

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java
index 7cd3ff0..67a4797 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java
@@ -23,10 +23,10 @@ import akka.actor.ActorRef;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.PactCompiler;
+import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.client.JobClient;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -72,14 +72,14 @@ public abstract class RecordAPITestBase extends AbstractTestBase {
 			Assert.fail("Error: Cannot obtain Pact plan. Did the thest forget to override either 'getPactPlan()' or 'getJobGraph()' ?");
 		}
 		
-		PactCompiler pc = new PactCompiler(new DataStatistics());
+		Optimizer pc = new Optimizer(new DataStatistics());
 		OptimizedPlan op = pc.compile(p);
 		
 		if (printPlan) {
 			System.out.println(new PlanJSONDumpGenerator().getOptimizerPlanAsJSON(op)); 
 		}
 
-		NepheleJobGraphGenerator jgg = new NepheleJobGraphGenerator();
+		JobGraphGenerator jgg = new JobGraphGenerator();
 		return jgg.compileJobGraph(op);
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
index db1fc4d..44f35e7 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
@@ -24,10 +24,10 @@ import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.ExecutionEnvironmentFactory;
 import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.PactCompiler;
+import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.runtime.client.JobClient;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.junit.Assert;
@@ -49,7 +49,7 @@ public class TestEnvironment extends ExecutionEnvironment {
 		try {
 			OptimizedPlan op = compileProgram(jobName);
 
-			NepheleJobGraphGenerator jgg = new NepheleJobGraphGenerator();
+			JobGraphGenerator jgg = new JobGraphGenerator();
 			JobGraph jobGraph = jgg.compileJobGraph(op);
 
 			ActorRef client = this.executor.getJobClient();
@@ -80,7 +80,7 @@ public class TestEnvironment extends ExecutionEnvironment {
 	private OptimizedPlan compileProgram(String jobName) {
 		Plan p = createProgramPlan(jobName);
 
-		PactCompiler pc = new PactCompiler(new DataStatistics());
+		Optimizer pc = new Optimizer(new DataStatistics());
 		return pc.compile(p);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
index 9ab62e1..400ed3a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
@@ -38,9 +38,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.PactCompiler;
+import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.util.StringUtils;
 import org.apache.hadoop.fs.FileSystem;
@@ -140,9 +140,9 @@ public abstract class CancellingTestBase {
 	}
 
 	private JobGraph getJobGraph(final Plan plan) throws Exception {
-		final PactCompiler pc = new PactCompiler(new DataStatistics());
+		final Optimizer pc = new Optimizer(new DataStatistics());
 		final OptimizedPlan op = pc.compile(plan);
-		final NepheleJobGraphGenerator jgg = new NepheleJobGraphGenerator();
+		final JobGraphGenerator jgg = new JobGraphGenerator();
 		return jgg.compileJobGraph(op);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/ConnectedComponentsCoGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/ConnectedComponentsCoGroupTest.java b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/ConnectedComponentsCoGroupTest.java
index 1b73c4b..b38b784 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/ConnectedComponentsCoGroupTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/ConnectedComponentsCoGroupTest.java
@@ -27,7 +27,7 @@ import org.apache.flink.optimizer.plan.SinkPlanNode;
 import org.apache.flink.optimizer.plan.SourcePlanNode;
 import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.runtime.operators.util.LocalStrategy;
@@ -130,7 +130,7 @@ public class ConnectedComponentsCoGroupTest extends CompilerTestBase {
 		// check the caches
 		Assert.assertTrue(TempMode.CACHED == neighborsJoin.getInput2().getTempMode());
 		
-		NepheleJobGraphGenerator jgg = new NepheleJobGraphGenerator();
+		JobGraphGenerator jgg = new JobGraphGenerator();
 		jgg.compileJobGraph(optPlan);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/ConnectedComponentsTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/ConnectedComponentsTest.java b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/ConnectedComponentsTest.java
index 2c23eaf..dcc9c15 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/ConnectedComponentsTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/ConnectedComponentsTest.java
@@ -40,7 +40,7 @@ import org.apache.flink.optimizer.plan.SinkPlanNode;
 import org.apache.flink.optimizer.plan.SourcePlanNode;
 import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.runtime.operators.util.LocalStrategy;
@@ -152,7 +152,7 @@ public class ConnectedComponentsTest extends CompilerTestBase {
 		Assert.assertTrue(TempMode.PIPELINE_BREAKER == iter.getInitialWorksetInput().getTempMode() ||
 							LocalStrategy.SORT == iter.getInitialWorksetInput().getLocalStrategy());
 		
-		NepheleJobGraphGenerator jgg = new NepheleJobGraphGenerator();
+		JobGraphGenerator jgg = new JobGraphGenerator();
 		jgg.compileJobGraph(optPlan);
 	}
 	
@@ -233,7 +233,7 @@ public class ConnectedComponentsTest extends CompilerTestBase {
 		Assert.assertTrue(TempMode.PIPELINE_BREAKER == iter.getInitialWorksetInput().getTempMode() ||
 							LocalStrategy.SORT == iter.getInitialWorksetInput().getLocalStrategy());
 		
-		NepheleJobGraphGenerator jgg = new NepheleJobGraphGenerator();
+		JobGraphGenerator jgg = new JobGraphGenerator();
 		jgg.compileJobGraph(optPlan);
 	}