You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/20 11:07:26 UTC
[47/53] [abbrv] flink git commit: [FLINK-441] [optimizer] Rename
Pact* and Nephele* classes
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
index 0dd7e93..ff0e004 100644
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
@@ -47,7 +47,7 @@ import org.apache.flink.api.java.record.operators.ReduceOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.optimizer.testfunctions.DummyFlatJoinFunction;
import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
import org.apache.flink.optimizer.testfunctions.IdentityKeyExtractor;
@@ -101,7 +101,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
// ---------- compile plan to nephele job graph to verify that no error is thrown ----------
- NepheleJobGraphGenerator jobGen = new NepheleJobGraphGenerator();
+ JobGraphGenerator jobGen = new JobGraphGenerator();
jobGen.compileJobGraph(oPlan);
} catch (Exception e) {
e.printStackTrace();
@@ -162,13 +162,13 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
allSinks.add(out3Path);
for (SinkPlanNode n : oPlan.getDataSinks()) {
- String path = ((FileDataSink) n.getSinkNode().getPactContract()).getFilePath();
+ String path = ((FileDataSink) n.getSinkNode().getOperator()).getFilePath();
Assert.assertTrue("Invalid data sink.", allSinks.remove(path));
}
// ---------- compile plan to nephele job graph to verify that no error is thrown ----------
- NepheleJobGraphGenerator jobGen = new NepheleJobGraphGenerator();
+ JobGraphGenerator jobGen = new JobGraphGenerator();
jobGen.compileJobGraph(oPlan);
} catch (Exception e) {
e.printStackTrace();
@@ -262,7 +262,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
OptimizedPlan oPlan = compileNoStats(plan);
- NepheleJobGraphGenerator jobGen = new NepheleJobGraphGenerator();
+ JobGraphGenerator jobGen = new JobGraphGenerator();
//Compile plan to verify that no error is thrown
jobGen.compileJobGraph(oPlan);
@@ -350,13 +350,13 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
allSinks.add(out3Path);
for (SinkPlanNode n : oPlan.getDataSinks()) {
- String path = ((FileDataSink) n.getSinkNode().getPactContract()).getFilePath();
+ String path = ((FileDataSink) n.getSinkNode().getOperator()).getFilePath();
Assert.assertTrue("Invalid data sink.", allSinks.remove(path));
}
// ---------- compile plan to nephele job graph to verify that no error is thrown ----------
- NepheleJobGraphGenerator jobGen = new NepheleJobGraphGenerator();
+ JobGraphGenerator jobGen = new JobGraphGenerator();
jobGen.compileJobGraph(oPlan);
} catch (Exception e) {
e.printStackTrace();
@@ -449,7 +449,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
OptimizedPlan oPlan = compileNoStats(plan);
- NepheleJobGraphGenerator jobGen = new NepheleJobGraphGenerator();
+ JobGraphGenerator jobGen = new JobGraphGenerator();
//Compile plan to verify that no error is thrown
jobGen.compileJobGraph(oPlan);
@@ -495,7 +495,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
.input2(ma2)
.name("Match 2")
.build();
- mat2.setParameter(PactCompiler.HINT_LOCAL_STRATEGY, PactCompiler.HINT_LOCAL_STRATEGY_MERGE);
+ mat2.setParameter(Optimizer.HINT_LOCAL_STRATEGY, Optimizer.HINT_LOCAL_STRATEGY_MERGE);
FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, mat2);
@@ -505,7 +505,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
OptimizedPlan oPlan = compileNoStats(plan);
- NepheleJobGraphGenerator jobGen = new NepheleJobGraphGenerator();
+ JobGraphGenerator jobGen = new JobGraphGenerator();
//Compile plan to verify that no error is thrown
jobGen.compileJobGraph(oPlan);
@@ -555,13 +555,13 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
allSinks.add(out2Path);
for (SinkPlanNode n : oPlan.getDataSinks()) {
- String path = ((FileDataSink) n.getSinkNode().getPactContract()).getFilePath();
+ String path = ((FileDataSink) n.getSinkNode().getOperator()).getFilePath();
Assert.assertTrue("Invalid data sink.", allSinks.remove(path));
}
// ---------- compile plan to nephele job graph to verify that no error is thrown ----------
- NepheleJobGraphGenerator jobGen = new NepheleJobGraphGenerator();
+ JobGraphGenerator jobGen = new JobGraphGenerator();
jobGen.compileJobGraph(oPlan);
} catch (Exception e) {
e.printStackTrace();
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java
index c17ebe8..3e7da6c 100644
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java
@@ -33,7 +33,7 @@ import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.optimizer.dag.TempMode;
import org.apache.flink.optimizer.plan.DualInputPlanNode;
import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.operators.DriverStrategy;
@@ -51,7 +51,7 @@ public class CachedMatchStrategyCompilerTest extends CompilerTestBase {
public void testRightSide() {
try {
- Plan plan = getTestPlanRightStatic(PactCompiler.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND);
+ Plan plan = getTestPlanRightStatic(Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND);
OptimizedPlan oPlan = compileNoStats(plan);
@@ -63,7 +63,7 @@ public class CachedMatchStrategyCompilerTest extends CompilerTestBase {
assertEquals(TempMode.NONE, innerJoin.getInput1().getTempMode());
assertEquals(TempMode.NONE, innerJoin.getInput2().getTempMode());
- new NepheleJobGraphGenerator().compileJobGraph(oPlan);
+ new JobGraphGenerator().compileJobGraph(oPlan);
}
catch (Exception e) {
System.err.println(e.getMessage());
@@ -79,7 +79,7 @@ public class CachedMatchStrategyCompilerTest extends CompilerTestBase {
public void testRightSideCountercheck() {
try {
- Plan plan = getTestPlanRightStatic(PactCompiler.HINT_LOCAL_STRATEGY_HASH_BUILD_FIRST);
+ Plan plan = getTestPlanRightStatic(Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_FIRST);
OptimizedPlan oPlan = compileNoStats(plan);
@@ -91,7 +91,7 @@ public class CachedMatchStrategyCompilerTest extends CompilerTestBase {
assertEquals(TempMode.NONE, innerJoin.getInput1().getTempMode());
assertEquals(TempMode.CACHED, innerJoin.getInput2().getTempMode());
- new NepheleJobGraphGenerator().compileJobGraph(oPlan);
+ new JobGraphGenerator().compileJobGraph(oPlan);
}
catch (Exception e) {
System.err.println(e.getMessage());
@@ -108,7 +108,7 @@ public class CachedMatchStrategyCompilerTest extends CompilerTestBase {
public void testLeftSide() {
try {
- Plan plan = getTestPlanLeftStatic(PactCompiler.HINT_LOCAL_STRATEGY_HASH_BUILD_FIRST);
+ Plan plan = getTestPlanLeftStatic(Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_FIRST);
OptimizedPlan oPlan = compileNoStats(plan);
@@ -120,7 +120,7 @@ public class CachedMatchStrategyCompilerTest extends CompilerTestBase {
assertEquals(TempMode.NONE, innerJoin.getInput1().getTempMode());
assertEquals(TempMode.NONE, innerJoin.getInput2().getTempMode());
- new NepheleJobGraphGenerator().compileJobGraph(oPlan);
+ new JobGraphGenerator().compileJobGraph(oPlan);
}
catch (Exception e) {
System.err.println(e.getMessage());
@@ -136,7 +136,7 @@ public class CachedMatchStrategyCompilerTest extends CompilerTestBase {
public void testLeftSideCountercheck() {
try {
- Plan plan = getTestPlanLeftStatic(PactCompiler.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND);
+ Plan plan = getTestPlanLeftStatic(Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND);
OptimizedPlan oPlan = compileNoStats(plan);
@@ -148,7 +148,7 @@ public class CachedMatchStrategyCompilerTest extends CompilerTestBase {
assertEquals(TempMode.CACHED, innerJoin.getInput1().getTempMode());
assertEquals(TempMode.NONE, innerJoin.getInput2().getTempMode());
- new NepheleJobGraphGenerator().compileJobGraph(oPlan);
+ new JobGraphGenerator().compileJobGraph(oPlan);
}
catch (Exception e) {
System.err.println(e.getMessage());
@@ -191,7 +191,7 @@ public class CachedMatchStrategyCompilerTest extends CompilerTestBase {
assertEquals(TempMode.NONE, innerJoin.getInput1().getTempMode());
assertEquals(TempMode.NONE, innerJoin.getInput2().getTempMode());
- new NepheleJobGraphGenerator().compileJobGraph(oPlan);
+ new JobGraphGenerator().compileJobGraph(oPlan);
}
catch (Exception e) {
System.err.println(e.getMessage());
@@ -212,10 +212,10 @@ public class CachedMatchStrategyCompilerTest extends CompilerTestBase {
IterativeDataSet<Tuple3<Long, Long, Long>> iteration = bigInput.iterate(10);
Configuration joinStrategy = new Configuration();
- joinStrategy.setString(PactCompiler.HINT_SHIP_STRATEGY, PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_HASH);
+ joinStrategy.setString(Optimizer.HINT_SHIP_STRATEGY, Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH);
if(strategy != "") {
- joinStrategy.setString(PactCompiler.HINT_LOCAL_STRATEGY, strategy);
+ joinStrategy.setString(Optimizer.HINT_LOCAL_STRATEGY, strategy);
}
DataSet<Tuple3<Long, Long, Long>> inner = iteration.join(smallInput).where(0).equalTo(0).with(new DummyJoiner()).name("DummyJoiner").withParameters(joinStrategy);
@@ -243,7 +243,7 @@ public class CachedMatchStrategyCompilerTest extends CompilerTestBase {
IterativeDataSet<Tuple3<Long, Long, Long>> iteration = bigInput.iterate(10);
Configuration joinStrategy = new Configuration();
- joinStrategy.setString(PactCompiler.HINT_LOCAL_STRATEGY, strategy);
+ joinStrategy.setString(Optimizer.HINT_LOCAL_STRATEGY, strategy);
DataSet<Tuple3<Long, Long, Long>> inner = smallInput.join(iteration).where(0).equalTo(0).with(new DummyJoiner()).name("DummyJoiner").withParameters(joinStrategy);
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java
index 5265e3a..565d992 100644
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java
+++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java
@@ -59,9 +59,9 @@ public abstract class CompilerTestBase implements java.io.Serializable {
protected transient DataStatistics dataStats;
- protected transient PactCompiler withStatsCompiler;
+ protected transient Optimizer withStatsCompiler;
- protected transient PactCompiler noStatsCompiler;
+ protected transient Optimizer noStatsCompiler;
private transient int statCounter;
@@ -70,10 +70,10 @@ public abstract class CompilerTestBase implements java.io.Serializable {
@Before
public void setup() {
this.dataStats = new DataStatistics();
- this.withStatsCompiler = new PactCompiler(this.dataStats, new DefaultCostEstimator());
+ this.withStatsCompiler = new Optimizer(this.dataStats, new DefaultCostEstimator());
this.withStatsCompiler.setDefaultDegreeOfParallelism(DEFAULT_PARALLELISM);
- this.noStatsCompiler = new PactCompiler(null, new DefaultCostEstimator());
+ this.noStatsCompiler = new Optimizer(null, new DefaultCostEstimator());
this.noStatsCompiler.setDefaultDegreeOfParallelism(DEFAULT_PARALLELISM);
}
@@ -111,7 +111,7 @@ public abstract class CompilerTestBase implements java.io.Serializable {
HashMap<String, ArrayList<PlanNode>> map = new HashMap<String, ArrayList<PlanNode>>();
for (PlanNode n : p.getAllNodes()) {
- Operator<?> c = n.getOriginalOptimizerNode().getPactContract();
+ Operator<?> c = n.getOriginalOptimizerNode().getOperator();
String name = c.getName();
ArrayList<PlanNode> list = map.get(name);
@@ -124,7 +124,7 @@ public abstract class CompilerTestBase implements java.io.Serializable {
boolean shouldAdd = true;
for (Iterator<PlanNode> iter = list.iterator(); iter.hasNext();) {
PlanNode in = iter.next();
- if (in.getOriginalOptimizerNode().getPactContract() == c) {
+ if (in.getOriginalOptimizerNode().getOperator() == c) {
// is this the child or is our node the child
if (in instanceof SingleInputPlanNode && n instanceof SingleInputPlanNode) {
SingleInputPlanNode thisNode = (SingleInputPlanNode) n;
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/DOPChangeTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/DOPChangeTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/DOPChangeTest.java
index 7fa331a..b17e777 100644
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/DOPChangeTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/DOPChangeTest.java
@@ -30,7 +30,7 @@ import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.PlanNode;
import org.apache.flink.optimizer.plan.SingleInputPlanNode;
import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.optimizer.util.DummyInputFormat;
import org.apache.flink.optimizer.util.DummyMatchStub;
import org.apache.flink.optimizer.util.DummyOutputFormat;
@@ -317,7 +317,7 @@ public class DOPChangeTest extends CompilerTestBase {
OptimizedPlan oPlan = compileNoStats(plan);
- NepheleJobGraphGenerator jobGen = new NepheleJobGraphGenerator();
+ JobGraphGenerator jobGen = new JobGraphGenerator();
//Compile plan to verify that no error is thrown
jobGen.compileJobGraph(oPlan);
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java
index b8809d7..aaee975 100644
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java
@@ -23,7 +23,7 @@ import static org.junit.Assert.*;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.junit.Test;
@SuppressWarnings("serial")
@@ -41,7 +41,7 @@ public class DisjointDataFlowsTest extends CompilerTestBase {
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
- new NepheleJobGraphGenerator().compileJobGraph(op);
+ new JobGraphGenerator().compileJobGraph(op);
}
catch (Exception e) {
e.printStackTrace();
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
index d97f855..34aa9f8 100644
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
@@ -78,10 +78,10 @@ public class DistinctCompilationTest extends CompilerTestBase implements java.io
assertEquals(new FieldList(0, 1), reduceNode.getInput().getLocalStrategyKeys());
// check DOP
- assertEquals(6, sourceNode.getDegreeOfParallelism());
- assertEquals(6, combineNode.getDegreeOfParallelism());
- assertEquals(8, reduceNode.getDegreeOfParallelism());
- assertEquals(8, sinkNode.getDegreeOfParallelism());
+ assertEquals(6, sourceNode.getParallelism());
+ assertEquals(6, combineNode.getParallelism());
+ assertEquals(8, reduceNode.getParallelism());
+ assertEquals(8, sinkNode.getParallelism());
}
catch (Exception e) {
System.err.println(e.getMessage());
@@ -136,13 +136,13 @@ public class DistinctCompilationTest extends CompilerTestBase implements java.io
assertEquals(new FieldList(0), reduceNode.getInput().getLocalStrategyKeys());
// check DOP
- assertEquals(6, sourceNode.getDegreeOfParallelism());
- assertEquals(6, keyExtractor.getDegreeOfParallelism());
- assertEquals(6, combineNode.getDegreeOfParallelism());
+ assertEquals(6, sourceNode.getParallelism());
+ assertEquals(6, keyExtractor.getParallelism());
+ assertEquals(6, combineNode.getParallelism());
- assertEquals(8, reduceNode.getDegreeOfParallelism());
- assertEquals(8, keyProjector.getDegreeOfParallelism());
- assertEquals(8, sinkNode.getDegreeOfParallelism());
+ assertEquals(8, reduceNode.getParallelism());
+ assertEquals(8, keyProjector.getParallelism());
+ assertEquals(8, sinkNode.getParallelism());
}
catch (Exception e) {
System.err.println(e.getMessage());
@@ -192,10 +192,10 @@ public class DistinctCompilationTest extends CompilerTestBase implements java.io
assertEquals(new FieldList(1), reduceNode.getInput().getLocalStrategyKeys());
// check DOP
- assertEquals(6, sourceNode.getDegreeOfParallelism());
- assertEquals(6, combineNode.getDegreeOfParallelism());
- assertEquals(8, reduceNode.getDegreeOfParallelism());
- assertEquals(8, sinkNode.getDegreeOfParallelism());
+ assertEquals(6, sourceNode.getParallelism());
+ assertEquals(6, combineNode.getParallelism());
+ assertEquals(8, reduceNode.getParallelism());
+ assertEquals(8, sinkNode.getParallelism());
}
catch (Exception e) {
System.err.println(e.getMessage());
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/HardPlansCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/HardPlansCompilationTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/HardPlansCompilationTest.java
index 146c085..6dadc19 100644
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/HardPlansCompilationTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/HardPlansCompilationTest.java
@@ -25,7 +25,7 @@ import org.apache.flink.api.java.record.operators.FileDataSource;
import org.apache.flink.api.java.record.operators.MapOperator;
import org.apache.flink.api.java.record.operators.ReduceOperator;
import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.optimizer.util.DummyCrossStub;
import org.apache.flink.optimizer.util.DummyInputFormat;
import org.apache.flink.optimizer.util.DummyOutputFormat;
@@ -74,7 +74,7 @@ public class HardPlansCompilationTest extends CompilerTestBase {
plan.setDefaultParallelism(DEFAULT_PARALLELISM);
OptimizedPlan oPlan = compileNoStats(plan);
- NepheleJobGraphGenerator jobGen = new NepheleJobGraphGenerator();
+ JobGraphGenerator jobGen = new JobGraphGenerator();
jobGen.compileJobGraph(oPlan);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java
index dc6fcad..ac4f820 100644
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java
@@ -42,7 +42,7 @@ import org.apache.flink.optimizer.plan.Channel;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.optimizer.testfunctions.IdentityKeyExtractor;
import org.apache.flink.optimizer.testfunctions.IdentityMapper;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
@@ -79,7 +79,7 @@ public class IterationsCompilerTest extends CompilerTestBase {
new PlanJSONDumpGenerator().getOptimizerPlanAsJSON(p);
// check that the JobGraphGenerator accepts the plan
- new NepheleJobGraphGenerator().compileJobGraph(p);
+ new JobGraphGenerator().compileJobGraph(p);
}
catch (Exception e) {
e.printStackTrace();
@@ -116,7 +116,7 @@ public class IterationsCompilerTest extends CompilerTestBase {
assertEquals(ShipStrategyType.PARTITION_HASH, wipn.getInput1().getShipStrategy());
assertTrue(wipn.getInput2().getTempMode().breaksPipeline());
- new NepheleJobGraphGenerator().compileJobGraph(op);
+ new JobGraphGenerator().compileJobGraph(op);
}
catch (Exception e) {
System.err.println(e.getMessage());
@@ -152,7 +152,7 @@ public class IterationsCompilerTest extends CompilerTestBase {
assertEquals(ShipStrategyType.PARTITION_HASH, wipn.getInput1().getShipStrategy());
assertTrue(wipn.getInput2().getTempMode().breaksPipeline());
- new NepheleJobGraphGenerator().compileJobGraph(op);
+ new JobGraphGenerator().compileJobGraph(op);
}
catch (Exception e) {
System.err.println(e.getMessage());
@@ -188,7 +188,7 @@ public class IterationsCompilerTest extends CompilerTestBase {
assertEquals(ShipStrategyType.FORWARD, wipn.getInput1().getShipStrategy());
assertTrue(wipn.getInput2().getTempMode().breaksPipeline());
- new NepheleJobGraphGenerator().compileJobGraph(op);
+ new JobGraphGenerator().compileJobGraph(op);
}
catch (Exception e) {
System.err.println(e.getMessage());
@@ -222,7 +222,7 @@ public class IterationsCompilerTest extends CompilerTestBase {
assertEquals(ShipStrategyType.PARTITION_HASH, c.getShipStrategy());
}
- new NepheleJobGraphGenerator().compileJobGraph(op);
+ new JobGraphGenerator().compileJobGraph(op);
}
catch (Exception e) {
System.err.println(e.getMessage());
@@ -299,7 +299,7 @@ public class IterationsCompilerTest extends CompilerTestBase {
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
- new NepheleJobGraphGenerator().compileJobGraph(op);
+ new JobGraphGenerator().compileJobGraph(op);
}
catch (Exception e) {
e.printStackTrace();
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/NestedIterationsTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/NestedIterationsTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/NestedIterationsTest.java
index d6852f5..e65758f 100644
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/NestedIterationsTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/NestedIterationsTest.java
@@ -27,7 +27,7 @@ import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.optimizer.testfunctions.DummyFlatJoinFunction;
import org.apache.flink.optimizer.testfunctions.IdentityKeyExtractor;
import org.apache.flink.optimizer.testfunctions.IdentityMapper;
@@ -133,7 +133,7 @@ public class NestedIterationsTest extends CompilerTestBase {
OptimizedPlan op = compileNoStats(p);
// job graph generator should be able to translate this
- new NepheleJobGraphGenerator().compileJobGraph(op);
+ new JobGraphGenerator().compileJobGraph(op);
}
catch (Exception e) {
e.printStackTrace();
@@ -171,7 +171,7 @@ public class NestedIterationsTest extends CompilerTestBase {
OptimizedPlan op = compileNoStats(p);
// job graph generator should be able to translate this
- new NepheleJobGraphGenerator().compileJobGraph(op);
+ new JobGraphGenerator().compileJobGraph(op);
}
catch (Exception e) {
e.printStackTrace();
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java
index e5983d9..86f01b0 100644
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java
@@ -149,7 +149,7 @@ public class PipelineBreakerTest extends CompilerTestBase {
DataSet<Long> initialSource = env.generateSequence(1, 10);
Configuration conf= new Configuration();
- conf.setString(PactCompiler.HINT_LOCAL_STRATEGY, PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_FIRST);
+ conf.setString(Optimizer.HINT_LOCAL_STRATEGY, Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_FIRST);
initialSource
.map(new IdentityMapper<Long>())
.cross(initialSource).withParameters(conf)
@@ -171,7 +171,7 @@ public class PipelineBreakerTest extends CompilerTestBase {
DataSet<Long> initialSource = env.generateSequence(1, 10);
Configuration conf= new Configuration();
- conf.setString(PactCompiler.HINT_LOCAL_STRATEGY, PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_SECOND);
+ conf.setString(Optimizer.HINT_LOCAL_STRATEGY, Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_SECOND);
initialSource
.map(new IdentityMapper<Long>())
.cross(initialSource).withParameters(conf)
@@ -194,7 +194,7 @@ public class PipelineBreakerTest extends CompilerTestBase {
DataSet<Long> initialSource = env.generateSequence(1, 10);
Configuration conf= new Configuration();
- conf.setString(PactCompiler.HINT_LOCAL_STRATEGY, PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_FIRST);
+ conf.setString(Optimizer.HINT_LOCAL_STRATEGY, Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_FIRST);
initialSource
.map(new IdentityMapper<Long>())
.cross(initialSource).withParameters(conf)
@@ -217,7 +217,7 @@ public class PipelineBreakerTest extends CompilerTestBase {
DataSet<Long> initialSource = env.generateSequence(1, 10);
Configuration conf= new Configuration();
- conf.setString(PactCompiler.HINT_LOCAL_STRATEGY, PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_SECOND);
+ conf.setString(Optimizer.HINT_LOCAL_STRATEGY, Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_SECOND);
initialSource
.map(new IdentityMapper<Long>())
.cross(initialSource).withParameters(conf)
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java
index 23fa311..f6885c5 100644
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java
@@ -25,7 +25,7 @@ import org.apache.flink.api.java.record.operators.FileDataSink;
import org.apache.flink.api.java.record.operators.FileDataSource;
import org.apache.flink.api.java.record.operators.ReduceOperator;
import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.optimizer.util.DummyInputFormat;
import org.apache.flink.optimizer.util.DummyOutputFormat;
import org.apache.flink.optimizer.util.IdentityReduce;
@@ -51,7 +51,7 @@ public class ReduceAllTest extends CompilerTestBase {
try {
OptimizedPlan oPlan = compileNoStats(plan);
- NepheleJobGraphGenerator jobGen = new NepheleJobGraphGenerator();
+ JobGraphGenerator jobGen = new JobGraphGenerator();
jobGen.compileJobGraph(oPlan);
} catch(CompilerException ce) {
ce.printStackTrace();
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java
index 8356e94..1fe16bb 100644
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java
@@ -63,7 +63,7 @@ public class SemanticPropertiesAPIToPlanTest extends CompilerTestBase {
oPlan.accept(new Visitor<PlanNode>() {
@Override
public boolean preVisit(PlanNode visitable) {
- if (visitable instanceof SingleInputPlanNode && visitable.getPactContract() instanceof ReduceOperatorBase) {
+ if (visitable instanceof SingleInputPlanNode && visitable.getProgramOperator() instanceof ReduceOperatorBase) {
for (Channel input: visitable.getInputs()) {
GlobalProperties gprops = visitable.getGlobalProperties();
LocalProperties lprops = visitable.getLocalProperties();
@@ -78,7 +78,7 @@ public class SemanticPropertiesAPIToPlanTest extends CompilerTestBase {
lprops.getGroupedFields().contains(1));
}
}
- if (visitable instanceof SingleInputPlanNode && visitable.getPactContract() instanceof MapOperatorBase) {
+ if (visitable instanceof SingleInputPlanNode && visitable.getProgramOperator() instanceof MapOperatorBase) {
for (Channel input: visitable.getInputs()) {
GlobalProperties gprops = visitable.getGlobalProperties();
LocalProperties lprops = visitable.getLocalProperties();
@@ -124,7 +124,7 @@ public class SemanticPropertiesAPIToPlanTest extends CompilerTestBase {
oPlan.accept(new Visitor<PlanNode>() {
@Override
public boolean preVisit(PlanNode visitable) {
- if (visitable instanceof DualInputPlanNode && visitable.getPactContract() instanceof JoinOperatorBase) {
+ if (visitable instanceof DualInputPlanNode && visitable.getProgramOperator() instanceof JoinOperatorBase) {
DualInputPlanNode node = ((DualInputPlanNode) visitable);
final Channel inConn1 = node.getInput1();
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java
index 6647483..92b4fc5 100644
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java
@@ -30,7 +30,7 @@ import org.apache.flink.optimizer.plan.Channel;
import org.apache.flink.optimizer.plan.NAryUnionPlanNode;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.junit.Test;
@SuppressWarnings("serial")
@@ -80,7 +80,7 @@ public class UnionBetweenDynamicAndStaticPathTest extends CompilerTestBase {
assertEquals(0.5, mixedUnion.getInput1().getRelativeTempMemory(), 0.0);
assertEquals(0.0, mixedUnion.getInput2().getRelativeTempMemory(), 0.0);
- new NepheleJobGraphGenerator().compileJobGraph(op);
+ new JobGraphGenerator().compileJobGraph(op);
}
catch (Exception e) {
e.printStackTrace();
@@ -133,7 +133,7 @@ public class UnionBetweenDynamicAndStaticPathTest extends CompilerTestBase {
assertTrue(c.isOnDynamicPath());
}
- new NepheleJobGraphGenerator().compileJobGraph(op);
+ new JobGraphGenerator().compileJobGraph(op);
}
catch (Exception e) {
e.printStackTrace();
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java
index cb4bce4..5d15ed8 100644
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java
@@ -43,7 +43,7 @@ import org.apache.flink.optimizer.plan.NAryUnionPlanNode;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.PlanNode;
import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.optimizer.util.DummyInputFormat;
import org.apache.flink.optimizer.util.DummyOutputFormat;
import org.apache.flink.optimizer.util.IdentityReduce;
@@ -78,7 +78,7 @@ public class UnionPropertyPropagationTest extends CompilerTestBase {
OptimizedPlan oPlan = compileNoStats(plan);
- NepheleJobGraphGenerator jobGen = new NepheleJobGraphGenerator();
+ JobGraphGenerator jobGen = new JobGraphGenerator();
// Compile plan to verify that no error is thrown
jobGen.compileJobGraph(oPlan);
@@ -87,7 +87,7 @@ public class UnionPropertyPropagationTest extends CompilerTestBase {
@Override
public boolean preVisit(PlanNode visitable) {
- if (visitable instanceof SingleInputPlanNode && visitable.getPactContract() instanceof ReduceOperator) {
+ if (visitable instanceof SingleInputPlanNode && visitable.getProgramOperator() instanceof ReduceOperator) {
for (Channel inConn : visitable.getInputs()) {
Assert.assertTrue("Reduce should just forward the input if it is already partitioned",
inConn.getShipStrategy() == ShipStrategyType.FORWARD);
@@ -126,7 +126,7 @@ public class UnionPropertyPropagationTest extends CompilerTestBase {
// return the plan
Plan plan = env.createProgramPlan("Test union on new java-api");
OptimizedPlan oPlan = compileNoStats(plan);
- NepheleJobGraphGenerator jobGen = new NepheleJobGraphGenerator();
+ JobGraphGenerator jobGen = new JobGraphGenerator();
// Compile plan to verify that no error is thrown
jobGen.compileJobGraph(oPlan);
@@ -139,7 +139,7 @@ public class UnionPropertyPropagationTest extends CompilerTestBase {
/* Test on the union output connections
* It must be under the GroupOperator and the strategy should be forward
*/
- if (visitable instanceof SingleInputPlanNode && visitable.getPactContract() instanceof GroupReduceOperatorBase){
+ if (visitable instanceof SingleInputPlanNode && visitable.getProgramOperator() instanceof GroupReduceOperatorBase){
final Channel inConn = ((SingleInputPlanNode) visitable).getInput();
Assert.assertTrue("Union should just forward the Partitioning",
inConn.getShipStrategy() == ShipStrategyType.FORWARD );
@@ -156,7 +156,7 @@ public class UnionPropertyPropagationTest extends CompilerTestBase {
final Channel inConn = inputs.next();
PlanNode inNode = inConn.getSource();
Assert.assertTrue("Input of Union should be FlatMapOperators",
- inNode.getPactContract() instanceof FlatMapOperatorBase);
+ inNode.getProgramOperator() instanceof FlatMapOperatorBase);
Assert.assertTrue("Shipment strategy under union should partition the data",
inConn.getShipStrategy() == ShipStrategyType.PARTITION_HASH);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java
index f327259..1e4124c 100644
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.common.Plan;
import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.junit.Test;
import static org.junit.Assert.fail;
@@ -44,7 +44,7 @@ public class UnionReplacementTest extends CompilerTestBase {
Plan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileNoStats(plan);
- NepheleJobGraphGenerator jobGen = new NepheleJobGraphGenerator();
+ JobGraphGenerator jobGen = new JobGraphGenerator();
jobGen.compileJobGraph(oPlan);
}
catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java
index d10803e..80c0bda 100644
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java
@@ -28,7 +28,7 @@ import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.junit.Test;
@SuppressWarnings("serial")
@@ -52,7 +52,7 @@ public class WorksetIterationCornerCasesTest extends CompilerTestBase {
WorksetIterationPlanNode wipn = (WorksetIterationPlanNode) op.getDataSinks().iterator().next().getInput().getSource();
assertTrue(wipn.getSolutionSetPlanNode().getOutgoingChannels().isEmpty());
- NepheleJobGraphGenerator jgg = new NepheleJobGraphGenerator();
+ JobGraphGenerator jgg = new JobGraphGenerator();
jgg.compileJobGraph(op);
}
catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java
index f17842e..6e7c0a3 100644
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java
@@ -34,7 +34,7 @@ import org.apache.flink.api.java.record.operators.ReduceOperator;
import org.apache.flink.optimizer.plan.DualInputPlanNode;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.optimizer.util.DummyInputFormat;
import org.apache.flink.optimizer.util.DummyMatchStub;
import org.apache.flink.optimizer.util.DummyNonPreservingMatchStub;
@@ -106,7 +106,7 @@ public class WorksetIterationsRecordApiCompilerTest extends CompilerTestBase {
assertTrue( (ss1 == ShipStrategyType.FORWARD && ss2 == ShipStrategyType.PARTITION_HASH) ||
(ss2 == ShipStrategyType.FORWARD && ss1 == ShipStrategyType.PARTITION_HASH) );
- new NepheleJobGraphGenerator().compileJobGraph(oPlan);
+ new JobGraphGenerator().compileJobGraph(oPlan);
}
@Test
@@ -150,7 +150,7 @@ public class WorksetIterationsRecordApiCompilerTest extends CompilerTestBase {
assertEquals(ShipStrategyType.PARTITION_HASH, joinWithSolutionSetNode.getOutgoingChannels().get(0).getShipStrategy());
assertEquals(ShipStrategyType.PARTITION_HASH, joinWithSolutionSetNode.getOutgoingChannels().get(1).getShipStrategy());
- new NepheleJobGraphGenerator().compileJobGraph(oPlan);
+ new JobGraphGenerator().compileJobGraph(oPlan);
}
@Test
@@ -193,7 +193,7 @@ public class WorksetIterationsRecordApiCompilerTest extends CompilerTestBase {
assertEquals(1, joinWithSolutionSetNode.getOutgoingChannels().size());
assertEquals(ShipStrategyType.FORWARD, joinWithSolutionSetNode.getOutgoingChannels().get(0).getShipStrategy());
- new NepheleJobGraphGenerator().compileJobGraph(oPlan);
+ new JobGraphGenerator().compileJobGraph(oPlan);
}
private Plan getRecordTestPlan(boolean joinPreservesSolutionSet, boolean mapBeforeSolutionDelta) {
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/BinaryCustomPartitioningCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/BinaryCustomPartitioningCompatibilityTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/BinaryCustomPartitioningCompatibilityTest.java
index e876fbb..0273659 100644
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/BinaryCustomPartitioningCompatibilityTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/BinaryCustomPartitioningCompatibilityTest.java
@@ -31,7 +31,7 @@ import org.apache.flink.optimizer.plan.DualInputPlanNode;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.SingleInputPlanNode;
import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.optimizer.testfunctions.DummyCoGroupFunction;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.junit.Test;
@@ -75,7 +75,7 @@ public class BinaryCustomPartitioningCompatibilityTest extends CompilerTestBase
assertEquals(partitioner, partitioner1.getInput().getPartitioner());
assertEquals(partitioner, partitioner2.getInput().getPartitioner());
- new NepheleJobGraphGenerator().compileJobGraph(op);
+ new JobGraphGenerator().compileJobGraph(op);
}
catch (Exception e) {
e.printStackTrace();
@@ -120,7 +120,7 @@ public class BinaryCustomPartitioningCompatibilityTest extends CompilerTestBase
assertEquals(partitioner, partitioner1.getInput().getPartitioner());
assertEquals(partitioner, partitioner2.getInput().getPartitioner());
- new NepheleJobGraphGenerator().compileJobGraph(op);
+ new JobGraphGenerator().compileJobGraph(op);
}
catch (Exception e) {
e.printStackTrace();
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java
index f6d8d0e..d397ea2 100644
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java
@@ -64,17 +64,17 @@ public class CustomPartitioningTest extends CompilerTestBase {
SingleInputPlanNode balancer = (SingleInputPlanNode) partitioner.getInput().getSource();
assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
- assertEquals(parallelism, sink.getDegreeOfParallelism());
+ assertEquals(parallelism, sink.getParallelism());
assertEquals(ShipStrategyType.FORWARD, mapper.getInput().getShipStrategy());
- assertEquals(parallelism, mapper.getDegreeOfParallelism());
+ assertEquals(parallelism, mapper.getParallelism());
assertEquals(ShipStrategyType.PARTITION_CUSTOM, partitioner.getInput().getShipStrategy());
assertEquals(part, partitioner.getInput().getPartitioner());
- assertEquals(parallelism, partitioner.getDegreeOfParallelism());
+ assertEquals(parallelism, partitioner.getParallelism());
assertEquals(ShipStrategyType.PARTITION_FORCED_REBALANCE, balancer.getInput().getShipStrategy());
- assertEquals(parallelism, balancer.getDegreeOfParallelism());
+ assertEquals(parallelism, balancer.getParallelism());
}
catch (Exception e) {
e.printStackTrace();
@@ -134,17 +134,17 @@ public class CustomPartitioningTest extends CompilerTestBase {
SingleInputPlanNode balancer = (SingleInputPlanNode) partitioner.getInput().getSource();
assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
- assertEquals(parallelism, sink.getDegreeOfParallelism());
+ assertEquals(parallelism, sink.getParallelism());
assertEquals(ShipStrategyType.FORWARD, mapper.getInput().getShipStrategy());
- assertEquals(parallelism, mapper.getDegreeOfParallelism());
+ assertEquals(parallelism, mapper.getParallelism());
assertEquals(ShipStrategyType.PARTITION_CUSTOM, partitioner.getInput().getShipStrategy());
assertEquals(part, partitioner.getInput().getPartitioner());
- assertEquals(parallelism, partitioner.getDegreeOfParallelism());
+ assertEquals(parallelism, partitioner.getParallelism());
assertEquals(ShipStrategyType.PARTITION_FORCED_REBALANCE, balancer.getInput().getShipStrategy());
- assertEquals(parallelism, balancer.getDegreeOfParallelism());
+ assertEquals(parallelism, balancer.getParallelism());
}
catch (Exception e) {
e.printStackTrace();
@@ -206,23 +206,23 @@ public class CustomPartitioningTest extends CompilerTestBase {
SingleInputPlanNode balancer = (SingleInputPlanNode) keyExtractor.getInput().getSource();
assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
- assertEquals(parallelism, sink.getDegreeOfParallelism());
+ assertEquals(parallelism, sink.getParallelism());
assertEquals(ShipStrategyType.FORWARD, mapper.getInput().getShipStrategy());
- assertEquals(parallelism, mapper.getDegreeOfParallelism());
+ assertEquals(parallelism, mapper.getParallelism());
assertEquals(ShipStrategyType.FORWARD, keyRemover.getInput().getShipStrategy());
- assertEquals(parallelism, keyRemover.getDegreeOfParallelism());
+ assertEquals(parallelism, keyRemover.getParallelism());
assertEquals(ShipStrategyType.PARTITION_CUSTOM, partitioner.getInput().getShipStrategy());
assertEquals(part, partitioner.getInput().getPartitioner());
- assertEquals(parallelism, partitioner.getDegreeOfParallelism());
+ assertEquals(parallelism, partitioner.getParallelism());
assertEquals(ShipStrategyType.FORWARD, keyExtractor.getInput().getShipStrategy());
- assertEquals(parallelism, keyExtractor.getDegreeOfParallelism());
+ assertEquals(parallelism, keyExtractor.getParallelism());
assertEquals(ShipStrategyType.PARTITION_FORCED_REBALANCE, balancer.getInput().getShipStrategy());
- assertEquals(parallelism, balancer.getDegreeOfParallelism());
+ assertEquals(parallelism, balancer.getParallelism());
}
catch (Exception e) {
e.printStackTrace();
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeClosedBranchingTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeClosedBranchingTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeClosedBranchingTest.java
index 2c5d235..cb4bd78 100644
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeClosedBranchingTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeClosedBranchingTest.java
@@ -246,7 +246,7 @@ public class DataExchangeModeClosedBranchingTest extends CompilerTestBase {
private SinkPlanNode findSink(Collection<SinkPlanNode> collection, String name) {
for (SinkPlanNode node : collection) {
- String nodeName = node.getOptimizerNode().getPactContract().getName();
+ String nodeName = node.getOptimizerNode().getOperator().getName();
if (nodeName != null && nodeName.equals(name)) {
return node;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeOpenBranchingTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeOpenBranchingTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeOpenBranchingTest.java
index 6c0e88b..6b2691a 100644
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeOpenBranchingTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeOpenBranchingTest.java
@@ -171,7 +171,7 @@ public class DataExchangeModeOpenBranchingTest extends CompilerTestBase {
private SinkPlanNode findSink(Collection<SinkPlanNode> collection, String name) {
for (SinkPlanNode node : collection) {
- String nodeName = node.getOptimizerNode().getPactContract().getName();
+ String nodeName = node.getOptimizerNode().getOperator().getName();
if (nodeName != null && nodeName.equals(name)) {
return node;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/PipelineBreakingTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/PipelineBreakingTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/PipelineBreakingTest.java
index dae3c41..fe33635 100644
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/PipelineBreakingTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/PipelineBreakingTest.java
@@ -25,7 +25,7 @@ import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.optimizer.PactCompiler;
+import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.dag.DataSinkNode;
import org.apache.flink.optimizer.dag.OptimizerNode;
import org.apache.flink.optimizer.dag.SingleInputNode;
@@ -292,8 +292,8 @@ public class PipelineBreakingTest {
}
private static List<DataSinkNode> convertPlan(Plan p) {
- PactCompiler.GraphCreatingVisitor dagCreator =
- new PactCompiler.GraphCreatingVisitor(17, p.getExecutionConfig().getExecutionMode());
+ Optimizer.GraphCreatingVisitor dagCreator =
+ new Optimizer.GraphCreatingVisitor(17, p.getExecutionConfig().getExecutionMode());
// create the DAG
p.accept(dagCreator);
@@ -312,8 +312,8 @@ public class PipelineBreakingTest {
rootNode = new SinkJoiner(rootNode, iter.next());
}
}
- rootNode.accept(new PactCompiler.IdAndEstimatesVisitor(null));
- rootNode.accept(new PactCompiler.BranchesVisitor());
+ rootNode.accept(new Optimizer.IdAndEstimatesVisitor(null));
+ rootNode.accept(new Optimizer.BranchesVisitor());
return sinks;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
index 32aeab9..a683968 100644
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
@@ -73,9 +73,9 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
assertEquals(DriverStrategy.ALL_GROUP_REDUCE, reduceNode.getDriverStrategy());
// check DOP
- assertEquals(1, sourceNode.getDegreeOfParallelism());
- assertEquals(1, reduceNode.getDegreeOfParallelism());
- assertEquals(1, sinkNode.getDegreeOfParallelism());
+ assertEquals(1, sourceNode.getParallelism());
+ assertEquals(1, reduceNode.getParallelism());
+ assertEquals(1, sinkNode.getParallelism());
}
catch (Exception e) {
System.err.println(e.getMessage());
@@ -121,10 +121,10 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
assertEquals(DriverStrategy.ALL_GROUP_REDUCE_COMBINE, combineNode.getDriverStrategy());
// check DOP
- assertEquals(8, sourceNode.getDegreeOfParallelism());
- assertEquals(8, combineNode.getDegreeOfParallelism());
- assertEquals(1, reduceNode.getDegreeOfParallelism());
- assertEquals(1, sinkNode.getDegreeOfParallelism());
+ assertEquals(8, sourceNode.getParallelism());
+ assertEquals(8, combineNode.getParallelism());
+ assertEquals(1, reduceNode.getParallelism());
+ assertEquals(1, sinkNode.getParallelism());
}
catch (Exception e) {
System.err.println(e.getMessage());
@@ -172,9 +172,9 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
assertEquals(new FieldList(1), reduceNode.getInput().getLocalStrategyKeys());
// check DOP
- assertEquals(6, sourceNode.getDegreeOfParallelism());
- assertEquals(8, reduceNode.getDegreeOfParallelism());
- assertEquals(8, sinkNode.getDegreeOfParallelism());
+ assertEquals(6, sourceNode.getParallelism());
+ assertEquals(8, reduceNode.getParallelism());
+ assertEquals(8, sinkNode.getParallelism());
}
catch (Exception e) {
System.err.println(e.getMessage());
@@ -229,10 +229,10 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
assertEquals(new FieldList(1), reduceNode.getInput().getLocalStrategyKeys());
// check DOP
- assertEquals(6, sourceNode.getDegreeOfParallelism());
- assertEquals(6, combineNode.getDegreeOfParallelism());
- assertEquals(8, reduceNode.getDegreeOfParallelism());
- assertEquals(8, sinkNode.getDegreeOfParallelism());
+ assertEquals(6, sourceNode.getParallelism());
+ assertEquals(6, combineNode.getParallelism());
+ assertEquals(8, reduceNode.getParallelism());
+ assertEquals(8, sinkNode.getParallelism());
}
catch (Exception e) {
System.err.println(e.getMessage());
@@ -285,12 +285,12 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
assertEquals(new FieldList(0), reduceNode.getInput().getLocalStrategyKeys());
// check DOP
- assertEquals(6, sourceNode.getDegreeOfParallelism());
- assertEquals(6, keyExtractor.getDegreeOfParallelism());
+ assertEquals(6, sourceNode.getParallelism());
+ assertEquals(6, keyExtractor.getParallelism());
- assertEquals(8, reduceNode.getDegreeOfParallelism());
- assertEquals(8, keyProjector.getDegreeOfParallelism());
- assertEquals(8, sinkNode.getDegreeOfParallelism());
+ assertEquals(8, reduceNode.getParallelism());
+ assertEquals(8, keyProjector.getParallelism());
+ assertEquals(8, sinkNode.getParallelism());
}
catch (Exception e) {
System.err.println(e.getMessage());
@@ -351,13 +351,13 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
assertEquals(new FieldList(0), reduceNode.getInput().getLocalStrategyKeys());
// check DOP
- assertEquals(6, sourceNode.getDegreeOfParallelism());
- assertEquals(6, keyExtractor.getDegreeOfParallelism());
- assertEquals(6, combineNode.getDegreeOfParallelism());
+ assertEquals(6, sourceNode.getParallelism());
+ assertEquals(6, keyExtractor.getParallelism());
+ assertEquals(6, combineNode.getParallelism());
- assertEquals(8, reduceNode.getDegreeOfParallelism());
- assertEquals(8, keyProjector.getDegreeOfParallelism());
- assertEquals(8, sinkNode.getDegreeOfParallelism());
+ assertEquals(8, reduceNode.getParallelism());
+ assertEquals(8, keyProjector.getParallelism());
+ assertEquals(8, sinkNode.getParallelism());
}
catch (Exception e) {
System.err.println(e.getMessage());
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java
index 54b24dd..37a8e81 100644
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java
@@ -34,7 +34,7 @@ import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.SingleInputPlanNode;
import org.apache.flink.optimizer.plan.SinkPlanNode;
import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.optimizer.testfunctions.IdentityMapper;
import org.junit.Test;
@@ -53,7 +53,7 @@ public class IterationCompilerTest extends CompilerTestBase {
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
- new NepheleJobGraphGenerator().compileJobGraph(op);
+ new JobGraphGenerator().compileJobGraph(op);
}
catch (Exception e) {
e.printStackTrace();
@@ -81,7 +81,7 @@ public class IterationCompilerTest extends CompilerTestBase {
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
- new NepheleJobGraphGenerator().compileJobGraph(op);
+ new JobGraphGenerator().compileJobGraph(op);
}
catch (Exception e) {
e.printStackTrace();
@@ -120,7 +120,7 @@ public class IterationCompilerTest extends CompilerTestBase {
assertTrue(union.getCostWeight() >= 1);
// see that the jobgraph generator can translate this
- new NepheleJobGraphGenerator().compileJobGraph(op);
+ new JobGraphGenerator().compileJobGraph(op);
}
catch (Exception e) {
e.printStackTrace();
@@ -179,7 +179,7 @@ public class IterationCompilerTest extends CompilerTestBase {
assertTrue(solutionDeltaUnion.isOnDynamicPath());
assertTrue(solutionDeltaUnion.getCostWeight() >= 1);
- new NepheleJobGraphGenerator().compileJobGraph(op);
+ new JobGraphGenerator().compileJobGraph(op);
}
catch (Exception e) {
e.printStackTrace();
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
index bbdad4a..0724a9f 100644
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
@@ -72,9 +72,9 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S
assertEquals(reduceNode, sinkNode.getInput().getSource());
// check DOP
- assertEquals(1, sourceNode.getDegreeOfParallelism());
- assertEquals(1, reduceNode.getDegreeOfParallelism());
- assertEquals(1, sinkNode.getDegreeOfParallelism());
+ assertEquals(1, sourceNode.getParallelism());
+ assertEquals(1, reduceNode.getParallelism());
+ assertEquals(1, sinkNode.getParallelism());
}
catch (Exception e) {
System.err.println(e.getMessage());
@@ -122,10 +122,10 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S
assertEquals(DriverStrategy.ALL_REDUCE, combineNode.getDriverStrategy());
// check DOP
- assertEquals(8, sourceNode.getDegreeOfParallelism());
- assertEquals(8, combineNode.getDegreeOfParallelism());
- assertEquals(1, reduceNode.getDegreeOfParallelism());
- assertEquals(1, sinkNode.getDegreeOfParallelism());
+ assertEquals(8, sourceNode.getParallelism());
+ assertEquals(8, combineNode.getParallelism());
+ assertEquals(1, reduceNode.getParallelism());
+ assertEquals(1, sinkNode.getParallelism());
}
catch (Exception e) {
System.err.println(e.getMessage());
@@ -180,10 +180,10 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S
assertEquals(new FieldList(1), reduceNode.getInput().getLocalStrategyKeys());
// check DOP
- assertEquals(6, sourceNode.getDegreeOfParallelism());
- assertEquals(6, combineNode.getDegreeOfParallelism());
- assertEquals(8, reduceNode.getDegreeOfParallelism());
- assertEquals(8, sinkNode.getDegreeOfParallelism());
+ assertEquals(6, sourceNode.getParallelism());
+ assertEquals(6, combineNode.getParallelism());
+ assertEquals(8, reduceNode.getParallelism());
+ assertEquals(8, sinkNode.getParallelism());
}
catch (Exception e) {
System.err.println(e.getMessage());
@@ -244,13 +244,13 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S
assertEquals(new FieldList(0), reduceNode.getInput().getLocalStrategyKeys());
// check DOP
- assertEquals(6, sourceNode.getDegreeOfParallelism());
- assertEquals(6, keyExtractor.getDegreeOfParallelism());
- assertEquals(6, combineNode.getDegreeOfParallelism());
+ assertEquals(6, sourceNode.getParallelism());
+ assertEquals(6, keyExtractor.getParallelism());
+ assertEquals(6, combineNode.getParallelism());
- assertEquals(8, reduceNode.getDegreeOfParallelism());
- assertEquals(8, keyProjector.getDegreeOfParallelism());
- assertEquals(8, sinkNode.getDegreeOfParallelism());
+ assertEquals(8, reduceNode.getParallelism());
+ assertEquals(8, keyProjector.getParallelism());
+ assertEquals(8, sinkNode.getParallelism());
}
catch (Exception e) {
System.err.println(e.getMessage());
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java
index 658cf7a..8720aa7 100644
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java
@@ -37,7 +37,7 @@ import org.apache.flink.optimizer.CompilerTestBase;
import org.apache.flink.optimizer.plan.DualInputPlanNode;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.util.Collector;
import org.junit.Test;
@@ -93,7 +93,7 @@ public class WorksetIterationsJavaApiCompilerTest extends CompilerTestBase {
assertTrue( (ss1 == ShipStrategyType.FORWARD && ss2 == ShipStrategyType.PARTITION_HASH) ||
(ss2 == ShipStrategyType.FORWARD && ss1 == ShipStrategyType.PARTITION_HASH) );
- new NepheleJobGraphGenerator().compileJobGraph(oPlan);
+ new JobGraphGenerator().compileJobGraph(oPlan);
}
catch (Exception e) {
System.err.println(e.getMessage());
@@ -137,7 +137,7 @@ public class WorksetIterationsJavaApiCompilerTest extends CompilerTestBase {
assertEquals(ShipStrategyType.PARTITION_HASH, joinWithSolutionSetNode.getOutgoingChannels().get(0).getShipStrategy());
assertEquals(ShipStrategyType.PARTITION_HASH, joinWithSolutionSetNode.getOutgoingChannels().get(1).getShipStrategy());
- new NepheleJobGraphGenerator().compileJobGraph(oPlan);
+ new JobGraphGenerator().compileJobGraph(oPlan);
}
catch (Exception e) {
System.err.println(e.getMessage());
@@ -182,7 +182,7 @@ public class WorksetIterationsJavaApiCompilerTest extends CompilerTestBase {
assertEquals(1, joinWithSolutionSetNode.getOutgoingChannels().size());
assertEquals(ShipStrategyType.FORWARD, joinWithSolutionSetNode.getOutgoingChannels().get(0).getShipStrategy());
- new NepheleJobGraphGenerator().compileJobGraph(oPlan);
+ new JobGraphGenerator().compileJobGraph(oPlan);
}
catch (Exception e) {
System.err.println(e.getMessage());
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/CoGroupOnConflictingPartitioningsTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/CoGroupOnConflictingPartitioningsTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/CoGroupOnConflictingPartitioningsTest.java
index 21d95ee..e7807c9 100644
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/CoGroupOnConflictingPartitioningsTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/CoGroupOnConflictingPartitioningsTest.java
@@ -26,7 +26,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.optimizer.CompilerException;
import org.apache.flink.optimizer.CompilerTestBase;
-import org.apache.flink.optimizer.PactCompiler;
+import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.testfunctions.DummyCoGroupFunction;
import org.apache.flink.configuration.Configuration;
import org.junit.Test;
@@ -42,8 +42,8 @@ public class CoGroupOnConflictingPartitioningsTest extends CompilerTestBase {
DataSet<Tuple2<Long, Long>> input = env.fromElements(new Tuple2<Long, Long>(0L, 0L));
Configuration cfg = new Configuration();
- cfg.setString(PactCompiler.HINT_SHIP_STRATEGY_FIRST_INPUT, PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_HASH);
- cfg.setString(PactCompiler.HINT_SHIP_STRATEGY_SECOND_INPUT, PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_RANGE);
+ cfg.setString(Optimizer.HINT_SHIP_STRATEGY_FIRST_INPUT, Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH);
+ cfg.setString(Optimizer.HINT_SHIP_STRATEGY_SECOND_INPUT, Optimizer.HINT_SHIP_STRATEGY_REPARTITION_RANGE);
input.coGroup(input).where(0).equalTo(0)
.with(new DummyCoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>())
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/JoinOnConflictingPartitioningsTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/JoinOnConflictingPartitioningsTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/JoinOnConflictingPartitioningsTest.java
index 4cfa189..9171cc7 100644
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/JoinOnConflictingPartitioningsTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/JoinOnConflictingPartitioningsTest.java
@@ -26,7 +26,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.optimizer.CompilerException;
import org.apache.flink.optimizer.CompilerTestBase;
-import org.apache.flink.optimizer.PactCompiler;
+import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.configuration.Configuration;
import org.junit.Test;
@@ -41,8 +41,8 @@ public class JoinOnConflictingPartitioningsTest extends CompilerTestBase {
DataSet<Tuple2<Long, Long>> input = env.fromElements(new Tuple2<Long, Long>(0L, 0L));
Configuration cfg = new Configuration();
- cfg.setString(PactCompiler.HINT_SHIP_STRATEGY_FIRST_INPUT, PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_HASH);
- cfg.setString(PactCompiler.HINT_SHIP_STRATEGY_SECOND_INPUT, PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_RANGE);
+ cfg.setString(Optimizer.HINT_SHIP_STRATEGY_FIRST_INPUT, Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH);
+ cfg.setString(Optimizer.HINT_SHIP_STRATEGY_SECOND_INPUT, Optimizer.HINT_SHIP_STRATEGY_REPARTITION_RANGE);
input.join(input).where(0).equalTo(0)
.withParameters(cfg)
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java b/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java
index 79e04fb..019345f 100644
--- a/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java
+++ b/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java
@@ -68,24 +68,24 @@ public class SpargelCompilerTest extends CompilerTestBase {
// check the sink
SinkPlanNode sink = op.getDataSinks().iterator().next();
assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
- assertEquals(DEFAULT_PARALLELISM, sink.getDegreeOfParallelism());
+ assertEquals(DEFAULT_PARALLELISM, sink.getParallelism());
// check the iteration
WorksetIterationPlanNode iteration = (WorksetIterationPlanNode) sink.getInput().getSource();
- assertEquals(DEFAULT_PARALLELISM, iteration.getDegreeOfParallelism());
+ assertEquals(DEFAULT_PARALLELISM, iteration.getParallelism());
// check the solution set join and the delta
PlanNode ssDelta = iteration.getSolutionSetDeltaPlanNode();
assertTrue(ssDelta instanceof DualInputPlanNode); // this is only true if the update functions preserves the partitioning
DualInputPlanNode ssJoin = (DualInputPlanNode) ssDelta;
- assertEquals(DEFAULT_PARALLELISM, ssJoin.getDegreeOfParallelism());
+ assertEquals(DEFAULT_PARALLELISM, ssJoin.getParallelism());
assertEquals(ShipStrategyType.PARTITION_HASH, ssJoin.getInput1().getShipStrategy());
assertEquals(new FieldList(0), ssJoin.getInput1().getShipStrategyKeys());
// check the workset set join
DualInputPlanNode edgeJoin = (DualInputPlanNode) ssJoin.getInput1().getSource();
- assertEquals(DEFAULT_PARALLELISM, edgeJoin.getDegreeOfParallelism());
+ assertEquals(DEFAULT_PARALLELISM, edgeJoin.getParallelism());
assertEquals(ShipStrategyType.PARTITION_HASH, edgeJoin.getInput1().getShipStrategy());
assertEquals(ShipStrategyType.FORWARD, edgeJoin.getInput2().getShipStrategy());
assertTrue(edgeJoin.getInput1().getTempMode().isCached());
@@ -143,24 +143,24 @@ public class SpargelCompilerTest extends CompilerTestBase {
// check the sink
SinkPlanNode sink = op.getDataSinks().iterator().next();
assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
- assertEquals(DEFAULT_PARALLELISM, sink.getDegreeOfParallelism());
+ assertEquals(DEFAULT_PARALLELISM, sink.getParallelism());
// check the iteration
WorksetIterationPlanNode iteration = (WorksetIterationPlanNode) sink.getInput().getSource();
- assertEquals(DEFAULT_PARALLELISM, iteration.getDegreeOfParallelism());
+ assertEquals(DEFAULT_PARALLELISM, iteration.getParallelism());
// check the solution set join and the delta
PlanNode ssDelta = iteration.getSolutionSetDeltaPlanNode();
assertTrue(ssDelta instanceof DualInputPlanNode); // this is only true if the update functions preserves the partitioning
DualInputPlanNode ssJoin = (DualInputPlanNode) ssDelta;
- assertEquals(DEFAULT_PARALLELISM, ssJoin.getDegreeOfParallelism());
+ assertEquals(DEFAULT_PARALLELISM, ssJoin.getParallelism());
assertEquals(ShipStrategyType.PARTITION_HASH, ssJoin.getInput1().getShipStrategy());
assertEquals(new FieldList(0), ssJoin.getInput1().getShipStrategyKeys());
// check the workset set join
DualInputPlanNode edgeJoin = (DualInputPlanNode) ssJoin.getInput1().getSource();
- assertEquals(DEFAULT_PARALLELISM, edgeJoin.getDegreeOfParallelism());
+ assertEquals(DEFAULT_PARALLELISM, edgeJoin.getParallelism());
assertEquals(ShipStrategyType.PARTITION_HASH, edgeJoin.getInput1().getShipStrategy());
assertEquals(ShipStrategyType.FORWARD, edgeJoin.getInput2().getShipStrategy());
assertTrue(edgeJoin.getInput1().getTempMode().isCached());
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/CompilerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/CompilerTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/CompilerTestBase.java
index acf20d3..788327a 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/CompilerTestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/CompilerTestBase.java
@@ -30,7 +30,7 @@ import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics;
import org.apache.flink.api.common.operators.GenericDataSourceBase;
import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.PactCompiler;
+import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.costs.DefaultCostEstimator;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.PlanNode;
@@ -57,9 +57,9 @@ public abstract class CompilerTestBase {
protected DataStatistics dataStats;
- protected PactCompiler withStatsCompiler;
+ protected Optimizer withStatsCompiler;
- protected PactCompiler noStatsCompiler;
+ protected Optimizer noStatsCompiler;
private int statCounter;
@@ -68,10 +68,10 @@ public abstract class CompilerTestBase {
@Before
public void setup() {
this.dataStats = new DataStatistics();
- this.withStatsCompiler = new PactCompiler(this.dataStats, new DefaultCostEstimator());
+ this.withStatsCompiler = new Optimizer(this.dataStats, new DefaultCostEstimator());
this.withStatsCompiler.setDefaultDegreeOfParallelism(DEFAULT_PARALLELISM);
- this.noStatsCompiler = new PactCompiler(null, new DefaultCostEstimator());
+ this.noStatsCompiler = new Optimizer(null, new DefaultCostEstimator());
this.noStatsCompiler.setDefaultDegreeOfParallelism(DEFAULT_PARALLELISM);
}
@@ -113,7 +113,7 @@ public abstract class CompilerTestBase {
HashMap<String, ArrayList<PlanNode>> map = new HashMap<String, ArrayList<PlanNode>>();
for (PlanNode n : p.getAllNodes()) {
- Operator<?> c = n.getOriginalOptimizerNode().getPactContract();
+ Operator<?> c = n.getOriginalOptimizerNode().getOperator();
String name = c.getName();
ArrayList<PlanNode> list = map.get(name);
@@ -126,7 +126,7 @@ public abstract class CompilerTestBase {
boolean shouldAdd = true;
for (Iterator<PlanNode> iter = list.iterator(); iter.hasNext();) {
PlanNode in = iter.next();
- if (in.getOriginalOptimizerNode().getPactContract() == c) {
+ if (in.getOriginalOptimizerNode().getOperator() == c) {
// is this the child or is our node the child
if (in instanceof SingleInputPlanNode && n instanceof SingleInputPlanNode) {
SingleInputPlanNode thisNode = (SingleInputPlanNode) n;
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java
index 7cd3ff0..67a4797 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java
@@ -23,10 +23,10 @@ import akka.actor.ActorRef;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.PactCompiler;
+import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.client.JobClient;
import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -72,14 +72,14 @@ public abstract class RecordAPITestBase extends AbstractTestBase {
Assert.fail("Error: Cannot obtain Pact plan. Did the thest forget to override either 'getPactPlan()' or 'getJobGraph()' ?");
}
- PactCompiler pc = new PactCompiler(new DataStatistics());
+ Optimizer pc = new Optimizer(new DataStatistics());
OptimizedPlan op = pc.compile(p);
if (printPlan) {
System.out.println(new PlanJSONDumpGenerator().getOptimizerPlanAsJSON(op));
}
- NepheleJobGraphGenerator jgg = new NepheleJobGraphGenerator();
+ JobGraphGenerator jgg = new JobGraphGenerator();
return jgg.compileJobGraph(op);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
index db1fc4d..44f35e7 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
@@ -24,10 +24,10 @@ import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.ExecutionEnvironmentFactory;
import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.PactCompiler;
+import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.runtime.client.JobClient;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.junit.Assert;
@@ -49,7 +49,7 @@ public class TestEnvironment extends ExecutionEnvironment {
try {
OptimizedPlan op = compileProgram(jobName);
- NepheleJobGraphGenerator jgg = new NepheleJobGraphGenerator();
+ JobGraphGenerator jgg = new JobGraphGenerator();
JobGraph jobGraph = jgg.compileJobGraph(op);
ActorRef client = this.executor.getJobClient();
@@ -80,7 +80,7 @@ public class TestEnvironment extends ExecutionEnvironment {
private OptimizedPlan compileProgram(String jobName) {
Plan p = createProgramPlan(jobName);
- PactCompiler pc = new PactCompiler(new DataStatistics());
+ Optimizer pc = new Optimizer(new DataStatistics());
return pc.compile(p);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
index 9ab62e1..400ed3a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
@@ -38,9 +38,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.api.common.Plan;
import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.PactCompiler;
+import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.util.StringUtils;
import org.apache.hadoop.fs.FileSystem;
@@ -140,9 +140,9 @@ public abstract class CancellingTestBase {
}
private JobGraph getJobGraph(final Plan plan) throws Exception {
- final PactCompiler pc = new PactCompiler(new DataStatistics());
+ final Optimizer pc = new Optimizer(new DataStatistics());
final OptimizedPlan op = pc.compile(plan);
- final NepheleJobGraphGenerator jgg = new NepheleJobGraphGenerator();
+ final JobGraphGenerator jgg = new JobGraphGenerator();
return jgg.compileJobGraph(op);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/ConnectedComponentsCoGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/ConnectedComponentsCoGroupTest.java b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/ConnectedComponentsCoGroupTest.java
index 1b73c4b..b38b784 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/ConnectedComponentsCoGroupTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/ConnectedComponentsCoGroupTest.java
@@ -27,7 +27,7 @@ import org.apache.flink.optimizer.plan.SinkPlanNode;
import org.apache.flink.optimizer.plan.SourcePlanNode;
import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.runtime.operators.DriverStrategy;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.runtime.operators.util.LocalStrategy;
@@ -130,7 +130,7 @@ public class ConnectedComponentsCoGroupTest extends CompilerTestBase {
// check the caches
Assert.assertTrue(TempMode.CACHED == neighborsJoin.getInput2().getTempMode());
- NepheleJobGraphGenerator jgg = new NepheleJobGraphGenerator();
+ JobGraphGenerator jgg = new JobGraphGenerator();
jgg.compileJobGraph(optPlan);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/ConnectedComponentsTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/ConnectedComponentsTest.java b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/ConnectedComponentsTest.java
index 2c23eaf..dcc9c15 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/ConnectedComponentsTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/ConnectedComponentsTest.java
@@ -40,7 +40,7 @@ import org.apache.flink.optimizer.plan.SinkPlanNode;
import org.apache.flink.optimizer.plan.SourcePlanNode;
import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.runtime.operators.DriverStrategy;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.runtime.operators.util.LocalStrategy;
@@ -152,7 +152,7 @@ public class ConnectedComponentsTest extends CompilerTestBase {
Assert.assertTrue(TempMode.PIPELINE_BREAKER == iter.getInitialWorksetInput().getTempMode() ||
LocalStrategy.SORT == iter.getInitialWorksetInput().getLocalStrategy());
- NepheleJobGraphGenerator jgg = new NepheleJobGraphGenerator();
+ JobGraphGenerator jgg = new JobGraphGenerator();
jgg.compileJobGraph(optPlan);
}
@@ -233,7 +233,7 @@ public class ConnectedComponentsTest extends CompilerTestBase {
Assert.assertTrue(TempMode.PIPELINE_BREAKER == iter.getInitialWorksetInput().getTempMode() ||
LocalStrategy.SORT == iter.getInitialWorksetInput().getLocalStrategy());
- NepheleJobGraphGenerator jgg = new NepheleJobGraphGenerator();
+ JobGraphGenerator jgg = new JobGraphGenerator();
jgg.compileJobGraph(optPlan);
}