You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2015/06/16 17:20:10 UTC
[3/3] flink git commit: [FLINK-2120][runtime] rename
AbstractJobVertex to JobVertex
[FLINK-2120][runtime] rename AbstractJobVertex to JobVertex
This closes #840.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1bd0af73
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1bd0af73
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1bd0af73
Branch: refs/heads/master
Commit: 1bd0af73892c812e6d16c5661cf10a9e2c13b3e4
Parents: 20d7e5a
Author: mjsax <mj...@informatik.hu-berlin.de>
Authored: Mon Jun 15 22:12:47 2015 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Tue Jun 16 17:18:45 2015 +0200
----------------------------------------------------------------------
.../client/program/ClientConnectionTest.java | 4 +-
.../plantranslate/JobGraphGenerator.java | 102 ++---
.../plantranslate/TempInIterationsTest.java | 4 +-
.../runtime/executiongraph/ExecutionGraph.java | 6 +-
.../executiongraph/ExecutionJobVertex.java | 10 +-
.../runtime/jobgraph/AbstractJobVertex.java | 405 -------------------
.../runtime/jobgraph/InputFormatVertex.java | 2 +-
.../runtime/jobgraph/IntermediateDataSet.java | 10 +-
.../apache/flink/runtime/jobgraph/JobEdge.java | 8 +-
.../apache/flink/runtime/jobgraph/JobGraph.java | 38 +-
.../flink/runtime/jobgraph/JobVertex.java | 405 +++++++++++++++++++
.../runtime/jobgraph/OutputFormatVertex.java | 2 +-
.../jobgraph/tasks/AbstractInvokable.java | 4 +-
.../jobmanager/scheduler/CoLocationGroup.java | 12 +-
.../checkpoint/CoordinatorShutdownTest.java | 6 +-
.../executiongraph/AllVerticesIteratorTest.java | 10 +-
.../ExecutionGraphConstructionTest.java | 102 ++---
.../ExecutionGraphDeploymentTest.java | 38 +-
.../executiongraph/ExecutionGraphTestUtils.java | 4 +-
.../ExecutionStateProgressTest.java | 4 +-
.../executiongraph/LocalInputSplitsTest.java | 6 +-
.../executiongraph/PointwisePatternTest.java | 44 +-
.../TerminalStateDeadlockTest.java | 8 +-
.../VertexLocationConstraintTest.java | 18 +-
.../executiongraph/VertexSlotSharingTest.java | 14 +-
.../flink/runtime/instance/SharedSlotsTest.java | 12 +-
.../PartialConsumePipelinedResultTest.java | 6 +-
.../flink/runtime/jobgraph/JobGraphTest.java | 74 ++--
.../runtime/jobgraph/JobTaskVertexTest.java | 10 +-
.../runtime/jobmanager/JobManagerTest.java | 4 +-
.../flink/runtime/jobmanager/JobSubmitTest.java | 6 +-
.../SlotCountExceedingParallelismTest.java | 6 +-
.../scheduler/CoLocationConstraintTest.java | 8 +-
.../ScheduleOrUpdateConsumersTest.java | 8 +-
.../runtime/taskmanager/TaskCancelTest.java | 10 +-
.../ExecutionGraphRestartTest.scala | 6 +-
.../TaskManagerLossFailsTasksTest.scala | 4 +-
.../jobmanager/CoLocationConstraintITCase.scala | 7 +-
.../runtime/jobmanager/JobManagerITCase.scala | 59 ++-
.../runtime/jobmanager/RecoveryITCase.scala | 16 +-
.../runtime/jobmanager/SlotSharingITCase.scala | 12 +-
.../TaskManagerFailsWithSlotSharingITCase.scala | 10 +-
.../api/graph/StreamingJobGraphGenerator.java | 20 +-
.../apache/flink/streaming/api/IterateTest.java | 12 +-
.../streaming/api/graph/SlotAllocationTest.java | 4 +-
.../BroadcastVarsNepheleITCase.java | 8 +-
.../KMeansIterativeNepheleITCase.java | 26 +-
.../JobSubmissionFailsITCase.java | 8 +-
.../ConnectedComponentsNepheleITCase.java | 58 +--
.../IterationWithChainingNepheleITCase.java | 8 +-
.../test/iterative/nephele/JobGraphUtils.java | 12 +-
.../CustomCompensatableDanglingPageRank.java | 10 +-
...mpensatableDanglingPageRankWithCombiner.java | 10 +-
.../CompensatableDanglingPageRank.java | 10 +-
.../runtime/NetworkStackThroughputITCase.java | 10 +-
.../jobmanager/JobManagerFailsITCase.scala | 6 +-
.../taskmanager/TaskManagerFailsITCase.scala | 14 +-
57 files changed, 869 insertions(+), 871 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/1bd0af73/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
index be6c19a..39b74a3 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.client.program;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.net.NetUtils;
@@ -87,7 +87,7 @@ public class ClientConnectionTest {
config.setString(ConfigConstants.AKKA_LOOKUP_TIMEOUT, (CONNECT_TIMEOUT/1000) + " s");
try {
- AbstractJobVertex vertex = new AbstractJobVertex("Test Vertex");
+ JobVertex vertex = new JobVertex("Test Vertex");
vertex.setInvokableClass(TestInvokable.class);
final JobGraph jg = new JobGraph("Test Job", vertex);
http://git-wip-us.apache.org/repos/asf/flink/blob/1bd0af73/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
index 2630019..109be20 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
@@ -55,7 +55,7 @@ import org.apache.flink.runtime.iterative.task.IterationHeadPactTask;
import org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask;
import org.apache.flink.runtime.iterative.task.IterationSynchronizationSinkTask;
import org.apache.flink.runtime.iterative.task.IterationTailPactTask;
-import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.InputFormatVertex;
import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -110,7 +110,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
// ------------------------------------------------------------------------
- private Map<PlanNode, AbstractJobVertex> vertices; // a map from optimizer nodes to job vertices
+ private Map<PlanNode, JobVertex> vertices; // a map from optimizer nodes to job vertices
private Map<PlanNode, TaskInChain> chainedTasks; // a map from optimizer nodes to job vertices
@@ -118,7 +118,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
private List<TaskInChain> chainedTasksInSequence;
- private List<AbstractJobVertex> auxVertices; // auxiliary vertices which are added during job graph generation
+ private List<JobVertex> auxVertices; // auxiliary vertices which are added during job graph generation
private final int defaultMaxFan;
@@ -157,10 +157,10 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
* @return JobGraph generated frmo the plan.
*/
public JobGraph compileJobGraph(OptimizedPlan program) {
- this.vertices = new HashMap<PlanNode, AbstractJobVertex>();
+ this.vertices = new HashMap<PlanNode, JobVertex>();
this.chainedTasks = new HashMap<PlanNode, TaskInChain>();
this.chainedTasksInSequence = new ArrayList<TaskInChain>();
- this.auxVertices = new ArrayList<AbstractJobVertex>();
+ this.auxVertices = new ArrayList<JobVertex>();
this.iterations = new HashMap<IterationPlanNode, IterationDescriptor>();
this.iterationStack = new ArrayList<IterationPlanNode>();
@@ -198,11 +198,11 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
graph.setAllowQueuedScheduling(false);
// add vertices to the graph
- for (AbstractJobVertex vertex : this.vertices.values()) {
+ for (JobVertex vertex : this.vertices.values()) {
graph.addVertex(vertex);
}
- for (AbstractJobVertex vertex : this.auxVertices) {
+ for (JobVertex vertex : this.auxVertices) {
graph.addVertex(vertex);
vertex.setSlotSharingGroup(sharingGroup);
}
@@ -251,7 +251,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
}
// the vertex to be created for the current node
- final AbstractJobVertex vertex;
+ final JobVertex vertex;
try {
if (node instanceof SinkPlanNode) {
vertex = createDataSinkVertex((SinkPlanNode) node);
@@ -322,7 +322,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
// we adjust the joins / cogroups that go into the solution set here
for (Channel c : node.getOutgoingChannels()) {
DualInputPlanNode target = (DualInputPlanNode) c.getTarget();
- AbstractJobVertex accessingVertex = this.vertices.get(target);
+ JobVertex accessingVertex = this.vertices.get(target);
TaskConfig conf = new TaskConfig(accessingVertex.getConfiguration());
int inputNum = c == target.getInput1() ? 0 : c == target.getInput2() ? 1 : -1;
@@ -438,7 +438,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
if (node instanceof WorksetIterationPlanNode) {
// connect the initial solution set
WorksetIterationPlanNode wsNode = (WorksetIterationPlanNode) node;
- AbstractJobVertex headVertex = this.iterations.get(wsNode).getHeadTask();
+ JobVertex headVertex = this.iterations.get(wsNode).getHeadTask();
TaskConfig headConfig = new TaskConfig(headVertex.getConfiguration());
int inputIndex = headConfig.getDriverStrategy().getNumInputs();
headConfig.setIterationHeadSolutionSetInputIndex(inputIndex);
@@ -448,7 +448,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
return;
}
- final AbstractJobVertex targetVertex = this.vertices.get(node);
+ final JobVertex targetVertex = this.vertices.get(node);
// --------- Main Path: Translation of channels ----------
@@ -480,7 +480,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
throw new CompilerException("Bug: Found a chained task with an input ship strategy other than FORWARD.");
}
- AbstractJobVertex container = chainedTask.getContainingVertex();
+ JobVertex container = chainedTask.getContainingVertex();
if (container == null) {
final PlanNode sourceNode = inConn.getSource();
@@ -526,7 +526,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
if (this.currentIteration != null) {
- AbstractJobVertex head = this.iterations.get(this.currentIteration).getHeadTask();
+ JobVertex head = this.iterations.get(this.currentIteration).getHeadTask();
// the head may still be null if we descend into the static parts first
if (head != null) {
targetVertex.setStrictlyCoLocatedWith(head);
@@ -580,7 +580,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
}
}
- private int translateChannel(Channel input, int inputIndex, AbstractJobVertex targetVertex,
+ private int translateChannel(Channel input, int inputIndex, JobVertex targetVertex,
TaskConfig targetVertexConfig, boolean isBroadcast) throws Exception
{
final PlanNode inputPlanNode = input.getSource();
@@ -656,7 +656,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
}
final PlanNode sourceNode = inConn.getSource();
- AbstractJobVertex sourceVertex = this.vertices.get(sourceNode);
+ JobVertex sourceVertex = this.vertices.get(sourceNode);
TaskConfig sourceVertexConfig;
if (sourceVertex == null) {
@@ -740,7 +740,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
// Methods for creating individual vertices
// ------------------------------------------------------------------------
- private AbstractJobVertex createSingleInputVertex(SingleInputPlanNode node) throws CompilerException {
+ private JobVertex createSingleInputVertex(SingleInputPlanNode node) throws CompilerException {
final String taskName = node.getNodeName();
final DriverStrategy ds = node.getDriverStrategy();
@@ -783,7 +783,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
}
}
- final AbstractJobVertex vertex;
+ final JobVertex vertex;
final TaskConfig config;
if (chaining) {
@@ -792,7 +792,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
this.chainedTasks.put(node, new TaskInChain(ds.getPushChainDriverClass(), config, taskName));
} else {
// create task vertex
- vertex = new AbstractJobVertex(taskName);
+ vertex = new JobVertex(taskName);
vertex.setInvokableClass((this.currentIteration != null && node.isOnDynamicPath()) ? IterationIntermediatePactTask.class : RegularPactTask.class);
config = new TaskConfig(vertex.getConfiguration());
@@ -813,10 +813,10 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
return vertex;
}
- private AbstractJobVertex createDualInputVertex(DualInputPlanNode node) throws CompilerException {
+ private JobVertex createDualInputVertex(DualInputPlanNode node) throws CompilerException {
final String taskName = node.getNodeName();
final DriverStrategy ds = node.getDriverStrategy();
- final AbstractJobVertex vertex = new AbstractJobVertex(taskName);
+ final JobVertex vertex = new JobVertex(taskName);
final TaskConfig config = new TaskConfig(vertex.getConfiguration());
vertex.setInvokableClass( (this.currentIteration != null && node.isOnDynamicPath()) ? IterationIntermediatePactTask.class : RegularPactTask.class);
@@ -857,7 +857,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
return vertex;
}
- private AbstractJobVertex createDataSinkVertex(SinkPlanNode node) throws CompilerException {
+ private JobVertex createDataSinkVertex(SinkPlanNode node) throws CompilerException {
final OutputFormatVertex vertex = new OutputFormatVertex(node.getNodeName());
final TaskConfig config = new TaskConfig(vertex.getConfiguration());
@@ -871,7 +871,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
return vertex;
}
- private AbstractJobVertex createBulkIterationHead(BulkPartialSolutionPlanNode pspn) {
+ private JobVertex createBulkIterationHead(BulkPartialSolutionPlanNode pspn) {
// get the bulk iteration that corresponds to this partial solution node
final BulkIterationPlanNode iteration = pspn.getContainingIterationNode();
@@ -902,12 +902,12 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
}
// create or adopt the head vertex
- final AbstractJobVertex toReturn;
- final AbstractJobVertex headVertex;
+ final JobVertex toReturn;
+ final JobVertex headVertex;
final TaskConfig headConfig;
if (merge) {
final PlanNode successor = pspn.getOutgoingChannels().get(0).getTarget();
- headVertex = (AbstractJobVertex) this.vertices.get(successor);
+ headVertex = (JobVertex) this.vertices.get(successor);
if (headVertex == null) {
throw new CompilerException(
@@ -922,7 +922,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
// instantiate the head vertex and give it a no-op driver as the driver strategy.
// everything else happens in the post visit, after the input (the initial partial solution)
// is connected.
- headVertex = new AbstractJobVertex("PartialSolution ("+iteration.getNodeName()+")");
+ headVertex = new JobVertex("PartialSolution ("+iteration.getNodeName()+")");
headVertex.setInvokableClass(IterationHeadPactTask.class);
headConfig = new TaskConfig(headVertex.getConfiguration());
headConfig.setDriver(NoOpDriver.class);
@@ -939,7 +939,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
return toReturn;
}
- private AbstractJobVertex createWorksetIterationHead(WorksetPlanNode wspn) {
+ private JobVertex createWorksetIterationHead(WorksetPlanNode wspn) {
// get the bulk iteration that corresponds to this partial solution node
final WorksetIterationPlanNode iteration = wspn.getContainingIterationNode();
@@ -970,12 +970,12 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
}
// create or adopt the head vertex
- final AbstractJobVertex toReturn;
- final AbstractJobVertex headVertex;
+ final JobVertex toReturn;
+ final JobVertex headVertex;
final TaskConfig headConfig;
if (merge) {
final PlanNode successor = wspn.getOutgoingChannels().get(0).getTarget();
- headVertex = (AbstractJobVertex) this.vertices.get(successor);
+ headVertex = (JobVertex) this.vertices.get(successor);
if (headVertex == null) {
throw new CompilerException(
@@ -990,7 +990,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
// instantiate the head vertex and give it a no-op driver as the driver strategy.
// everything else happens in the post visit, after the input (the initial partial solution)
// is connected.
- headVertex = new AbstractJobVertex("IterationHead("+iteration.getNodeName()+")");
+ headVertex = new JobVertex("IterationHead("+iteration.getNodeName()+")");
headVertex.setInvokableClass(IterationHeadPactTask.class);
headConfig = new TaskConfig(headVertex.getConfiguration());
headConfig.setDriver(NoOpDriver.class);
@@ -1045,8 +1045,8 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
* @throws CompilerException
*/
private DistributionPattern connectJobVertices(Channel channel, int inputNumber,
- final AbstractJobVertex sourceVertex, final TaskConfig sourceConfig,
- final AbstractJobVertex targetVertex, final TaskConfig targetConfig, boolean isBroadcast)
+ final JobVertex sourceVertex, final TaskConfig sourceConfig,
+ final JobVertex targetVertex, final TaskConfig targetConfig, boolean isBroadcast)
throws CompilerException
{
// ------------ connect the vertices to the job graph --------------
@@ -1187,7 +1187,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
private void finalizeBulkIteration(IterationDescriptor descr) {
final BulkIterationPlanNode bulkNode = (BulkIterationPlanNode) descr.getIterationNode();
- final AbstractJobVertex headVertex = descr.getHeadTask();
+ final JobVertex headVertex = descr.getHeadTask();
final TaskConfig headConfig = new TaskConfig(headVertex.getConfiguration());
final TaskConfig headFinalOutputConfig = descr.getHeadFinalResultConfig();
@@ -1208,7 +1208,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
headConfig.setRelativeBackChannelMemory(relativeMemForBackChannel);
// --------------------------- create the sync task ---------------------------
- final AbstractJobVertex sync = new AbstractJobVertex("Sync(" + bulkNode.getNodeName() + ")");
+ final JobVertex sync = new JobVertex("Sync(" + bulkNode.getNodeName() + ")");
sync.setInvokableClass(IterationSynchronizationSinkTask.class);
sync.setParallelism(1);
this.auxVertices.add(sync);
@@ -1232,14 +1232,14 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
final PlanNode rootOfStepFunction = bulkNode.getRootOfStepFunction();
final TaskConfig tailConfig;
- AbstractJobVertex rootOfStepFunctionVertex = (AbstractJobVertex) this.vertices.get(rootOfStepFunction);
+ JobVertex rootOfStepFunctionVertex = (JobVertex) this.vertices.get(rootOfStepFunction);
if (rootOfStepFunctionVertex == null) {
// last op is chained
final TaskInChain taskInChain = this.chainedTasks.get(rootOfStepFunction);
if (taskInChain == null) {
throw new CompilerException("Bug: Tail of step function not found as vertex or chained task.");
}
- rootOfStepFunctionVertex = (AbstractJobVertex) taskInChain.getContainingVertex();
+ rootOfStepFunctionVertex = (JobVertex) taskInChain.getContainingVertex();
// the fake channel is statically typed to pact record. no data is sent over this channel anyways.
tailConfig = taskInChain.getTaskConfig();
@@ -1262,7 +1262,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
final TaskConfig tailConfigOfTerminationCriterion;
// If we have a termination criterion and it is not an intermediate node
if(rootOfTerminationCriterion != null && rootOfTerminationCriterion.getOutgoingChannels().isEmpty()) {
- AbstractJobVertex rootOfTerminationCriterionVertex = (AbstractJobVertex) this.vertices.get(rootOfTerminationCriterion);
+ JobVertex rootOfTerminationCriterionVertex = (JobVertex) this.vertices.get(rootOfTerminationCriterion);
if (rootOfTerminationCriterionVertex == null) {
@@ -1271,7 +1271,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
if (taskInChain == null) {
throw new CompilerException("Bug: Tail of termination criterion not found as vertex or chained task.");
}
- rootOfTerminationCriterionVertex = (AbstractJobVertex) taskInChain.getContainingVertex();
+ rootOfTerminationCriterionVertex = (JobVertex) taskInChain.getContainingVertex();
// the fake channel is statically typed to pact record. no data is sent over this channel anyways.
tailConfigOfTerminationCriterion = taskInChain.getTaskConfig();
@@ -1312,7 +1312,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
private void finalizeWorksetIteration(IterationDescriptor descr) {
final WorksetIterationPlanNode iterNode = (WorksetIterationPlanNode) descr.getIterationNode();
- final AbstractJobVertex headVertex = descr.getHeadTask();
+ final JobVertex headVertex = descr.getHeadTask();
final TaskConfig headConfig = new TaskConfig(headVertex.getConfiguration());
final TaskConfig headFinalOutputConfig = descr.getHeadFinalResultConfig();
@@ -1344,7 +1344,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
// --------------------------- create the sync task ---------------------------
final TaskConfig syncConfig;
{
- final AbstractJobVertex sync = new AbstractJobVertex("Sync (" + iterNode.getNodeName() + ")");
+ final JobVertex sync = new JobVertex("Sync (" + iterNode.getNodeName() + ")");
sync.setInvokableClass(IterationSynchronizationSinkTask.class);
sync.setParallelism(1);
this.auxVertices.add(sync);
@@ -1381,14 +1381,14 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
{
// get the vertex for the workset update
final TaskConfig worksetTailConfig;
- AbstractJobVertex nextWorksetVertex = (AbstractJobVertex) this.vertices.get(nextWorksetNode);
+ JobVertex nextWorksetVertex = (JobVertex) this.vertices.get(nextWorksetNode);
if (nextWorksetVertex == null) {
// nextWorksetVertex is chained
TaskInChain taskInChain = this.chainedTasks.get(nextWorksetNode);
if (taskInChain == null) {
throw new CompilerException("Bug: Next workset node not found as vertex or chained task.");
}
- nextWorksetVertex = (AbstractJobVertex) taskInChain.getContainingVertex();
+ nextWorksetVertex = (JobVertex) taskInChain.getContainingVertex();
worksetTailConfig = taskInChain.getTaskConfig();
} else {
worksetTailConfig = new TaskConfig(nextWorksetVertex.getConfiguration());
@@ -1406,14 +1406,14 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
}
{
final TaskConfig solutionDeltaConfig;
- AbstractJobVertex solutionDeltaVertex = (AbstractJobVertex) this.vertices.get(solutionDeltaNode);
+ JobVertex solutionDeltaVertex = (JobVertex) this.vertices.get(solutionDeltaNode);
if (solutionDeltaVertex == null) {
// last op is chained
TaskInChain taskInChain = this.chainedTasks.get(solutionDeltaNode);
if (taskInChain == null) {
throw new CompilerException("Bug: Solution Set Delta not found as vertex or chained task.");
}
- solutionDeltaVertex = (AbstractJobVertex) taskInChain.getContainingVertex();
+ solutionDeltaVertex = (JobVertex) taskInChain.getContainingVertex();
solutionDeltaConfig = taskInChain.getTaskConfig();
} else {
solutionDeltaConfig = new TaskConfig(solutionDeltaVertex.getConfiguration());
@@ -1501,7 +1501,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
private final String taskName;
- private AbstractJobVertex containingVertex;
+ private JobVertex containingVertex;
TaskInChain(Class<? extends ChainedDriver<?, ?>> chainedTask, TaskConfig taskConfig,
String taskName) {
@@ -1522,11 +1522,11 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
return this.taskName;
}
- public AbstractJobVertex getContainingVertex() {
+ public JobVertex getContainingVertex() {
return this.containingVertex;
}
- public void setContainingVertex(AbstractJobVertex containingVertex) {
+ public void setContainingVertex(JobVertex containingVertex) {
this.containingVertex = containingVertex;
}
}
@@ -1535,7 +1535,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
private final IterationPlanNode iterationNode;
- private AbstractJobVertex headTask;
+ private JobVertex headTask;
private TaskConfig headConfig;
@@ -1552,7 +1552,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
return iterationNode;
}
- public void setHeadTask(AbstractJobVertex headTask, TaskConfig headConfig) {
+ public void setHeadTask(JobVertex headTask, TaskConfig headConfig) {
this.headTask = headTask;
this.headFinalResultConfig = new TaskConfig(new Configuration());
@@ -1564,7 +1564,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
this.headConfig = headConfig;
}
- public AbstractJobVertex getHeadTask() {
+ public JobVertex getHeadTask() {
return headTask;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1bd0af73/flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/TempInIterationsTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/TempInIterationsTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/TempInIterationsTest.java
index 15cb03f..a2a8d42 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/TempInIterationsTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/TempInIterationsTest.java
@@ -28,7 +28,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.testfunctions.DummyFlatJoinFunction;
-import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.junit.Test;
@@ -65,7 +65,7 @@ public class TempInIterationsTest {
JobGraph jg = jgg.compileJobGraph(oPlan);
boolean solutionSetUpdateChecked = false;
- for(AbstractJobVertex v : jg.getVertices()) {
+ for(JobVertex v : jg.getVertices()) {
if(v.getName().equals("SolutionSet Delta")) {
// check if input of solution set delta is temped
http://git-wip-us.apache.org/repos/asf/flink/blob/1bd0af73/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 17eb3f6..84cbab7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -29,7 +29,7 @@ import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
-import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobStatus;
@@ -455,7 +455,7 @@ public class ExecutionGraph implements Serializable {
// Actions
// --------------------------------------------------------------------------------------------
- public void attachJobGraph(List<AbstractJobVertex> topologiallySorted) throws JobException {
+ public void attachJobGraph(List<JobVertex> topologiallySorted) throws JobException {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Attaching %d topologically sorted vertices to existing job graph with %d "
+ "vertices and %d intermediate results.", topologiallySorted.size(), tasks.size(), intermediateResults.size()));
@@ -463,7 +463,7 @@ public class ExecutionGraph implements Serializable {
final long createTimestamp = System.currentTimeMillis();
- for (AbstractJobVertex jobVertex : topologiallySorted) {
+ for (JobVertex jobVertex : topologiallySorted) {
// create the execution job vertex and attach it to the graph
ExecutionJobVertex ejv = new ExecutionJobVertex(this, jobVertex, 1, timeout, createTimestamp);
http://git-wip-us.apache.org/repos/asf/flink/blob/1bd0af73/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index e53bc10..fcc0e9b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -25,7 +25,7 @@ import org.apache.flink.core.io.InputSplitSource;
import org.apache.flink.core.io.LocatableInputSplit;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobEdge;
@@ -57,7 +57,7 @@ public class ExecutionJobVertex implements Serializable {
private final ExecutionGraph graph;
- private final AbstractJobVertex jobVertex;
+ private final JobVertex jobVertex;
private final ExecutionVertex[] taskVertices;
@@ -81,12 +81,12 @@ public class ExecutionJobVertex implements Serializable {
private InputSplitAssigner splitAssigner;
- public ExecutionJobVertex(ExecutionGraph graph, AbstractJobVertex jobVertex,
+ public ExecutionJobVertex(ExecutionGraph graph, JobVertex jobVertex,
int defaultParallelism, FiniteDuration timeout) throws JobException {
this(graph, jobVertex, defaultParallelism, timeout, System.currentTimeMillis());
}
- public ExecutionJobVertex(ExecutionGraph graph, AbstractJobVertex jobVertex,
+ public ExecutionJobVertex(ExecutionGraph graph, JobVertex jobVertex,
int defaultParallelism, FiniteDuration timeout, long createTimestamp)
throws JobException
{
@@ -169,7 +169,7 @@ public class ExecutionJobVertex implements Serializable {
return graph;
}
- public AbstractJobVertex getJobVertex() {
+ public JobVertex getJobVertex() {
return jobVertex;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1bd0af73/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java
deleted file mode 100644
index 63968b0..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java
+++ /dev/null
@@ -1,405 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobgraph;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.InputSplitSource;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
-import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-
-import com.google.common.base.Preconditions;
-
-/**
- * An abstract base class for a job vertex.
- */
-public class AbstractJobVertex implements java.io.Serializable {
-
- private static final long serialVersionUID = 1L;
-
- private static final String DEFAULT_NAME = "(unnamed vertex)";
-
-
- // --------------------------------------------------------------------------------------------
- // Members that define the structure / topology of the graph
- // --------------------------------------------------------------------------------------------
-
- /** The ID of the vertex. */
- private final JobVertexID id;
-
- /** List of produced data sets, one per writer */
- private final ArrayList<IntermediateDataSet> results = new ArrayList<IntermediateDataSet>();
-
- /** List of edges with incoming data. One per Reader. */
- private final ArrayList<JobEdge> inputs = new ArrayList<JobEdge>();
-
- /** Number of subtasks to split this task into at runtime.*/
- private int parallelism = -1;
-
- /** Custom configuration passed to the assigned task at runtime. */
- private Configuration configuration;
-
- /** The class of the invokable. */
- private String invokableClassName;
-
- /** Optionally, a source of input splits */
- private InputSplitSource<?> inputSplitSource;
-
- /** The name of the vertex */
- private String name;
-
- /** Optionally, a sharing group that allows subtasks from different job vertices to run concurrently in one slot */
- private SlotSharingGroup slotSharingGroup;
-
- /** The group inside which the vertex subtasks share slots */
- private CoLocationGroup coLocationGroup;
-
- // --------------------------------------------------------------------------------------------
-
- /**
- * Constructs a new job vertex and assigns it with the given name.
- *
- * @param name The name of the new job vertex.
- */
- public AbstractJobVertex(String name) {
- this(name, null);
- }
-
- /**
- * Constructs a new job vertex and assigns it with the given name.
- *
- * @param name The name of the new job vertex.
- * @param id The id of the job vertex.
- */
- public AbstractJobVertex(String name, JobVertexID id) {
- this.name = name == null ? DEFAULT_NAME : name;
- this.id = id == null ? new JobVertexID() : id;
- }
-
- // --------------------------------------------------------------------------------------------
-
- /**
- * Returns the ID of this job vertex.
- *
- * @return The ID of this job vertex
- */
- public JobVertexID getID() {
- return this.id;
- }
-
- /**
- * Returns the name of the vertex.
- *
- * @return The name of the vertex.
- */
- public String getName() {
- return this.name;
- }
-
- /**
- * Sets the name of the vertex
- *
- * @param name The new name.
- */
- public void setName(String name) {
- this.name = name == null ? DEFAULT_NAME : name;
- }
-
- /**
- * Returns the number of produced intermediate data sets.
- *
- * @return The number of produced intermediate data sets.
- */
- public int getNumberOfProducedIntermediateDataSets() {
- return this.results.size();
- }
-
- /**
- * Returns the number of inputs.
- *
- * @return The number of inputs.
- */
- public int getNumberOfInputs() {
- return this.inputs.size();
- }
-
- /**
- * Returns the vertex's configuration object which can be used to pass custom settings to the task at runtime.
- *
- * @return the vertex's configuration object
- */
- public Configuration getConfiguration() {
- if (this.configuration == null) {
- this.configuration = new Configuration();
- }
- return this.configuration;
- }
-
- public void setInvokableClass(Class<? extends AbstractInvokable> invokable) {
- Preconditions.checkNotNull(invokable);
- this.invokableClassName = invokable.getName();
- }
-
- /**
- * Returns the name of the invokable class which represents the task of this vertex.
- *
- * @return The name of the invokable class, <code>null</code> if not set.
- */
- public String getInvokableClassName() {
- return this.invokableClassName;
- }
-
- /**
- * Returns the invokable class which represents the task of this vertex
- *
- * @param cl The classloader used to resolve user-defined classes
- * @return The invokable class, <code>null</code> if it is not set
- */
- public Class<? extends AbstractInvokable> getInvokableClass(ClassLoader cl) {
- if (cl == null) {
- throw new NullPointerException("The classloader must not be null.");
- }
- if (invokableClassName == null) {
- return null;
- }
-
- try {
- return Class.forName(invokableClassName, true, cl).asSubclass(AbstractInvokable.class);
- }
- catch (ClassNotFoundException e) {
- throw new RuntimeException("The user-code class could not be resolved.", e);
- }
- catch (ClassCastException e) {
- throw new RuntimeException("The user-code class is no subclass of " + AbstractInvokable.class.getName(), e);
- }
- }
-
- /**
- * Gets the parallelism of the task.
- *
- * @return The parallelism of the task.
- */
- public int getParallelism() {
- return parallelism;
- }
-
- /**
- * Sets the parallelism for the task.
- *
- * @param parallelism The parallelism for the task.
- */
- public void setParallelism(int parallelism) {
- if (parallelism < 1) {
- throw new IllegalArgumentException("The parallelism must be at least one.");
- }
- this.parallelism = parallelism;
- }
-
- public InputSplitSource<?> getInputSplitSource() {
- return inputSplitSource;
- }
-
- public void setInputSplitSource(InputSplitSource<?> inputSplitSource) {
- this.inputSplitSource = inputSplitSource;
- }
-
- public List<IntermediateDataSet> getProducedDataSets() {
- return this.results;
- }
-
- public List<JobEdge> getInputs() {
- return this.inputs;
- }
-
- /**
- * Associates this vertex with a slot sharing group for scheduling. Different vertices in the same
- * slot sharing group can run one subtask each in the same slot.
- *
- * @param grp The slot sharing group to associate the vertex with.
- */
- public void setSlotSharingGroup(SlotSharingGroup grp) {
- if (this.slotSharingGroup != null) {
- this.slotSharingGroup.removeVertexFromGroup(id);
- }
-
- this.slotSharingGroup = grp;
- if (grp != null) {
- grp.addVertexToGroup(id);
- }
- }
-
- /**
- * Gets the slot sharing group that this vertex is associated with. Different vertices in the same
- * slot sharing group can run one subtask each in the same slot. If the vertex is not associated with
- * a slot sharing group, this method returns {@code null}.
- *
- * @return The slot sharing group to associate the vertex with, or {@code null}, if not associated with one.
- */
- public SlotSharingGroup getSlotSharingGroup() {
- return slotSharingGroup;
- }
-
- /**
- * Tells this vertex to strictly co locate its subtasks with the subtasks of the given vertex.
- * Strict co-location implies that the n'th subtask of this vertex will run on the same parallel computing
- * instance (TaskManager) as the n'th subtask of the given vertex.
- *
- * NOTE: Co-location is only possible between vertices in a slot sharing group.
- *
- * NOTE: This vertex must (transitively) depend on the vertex to be co-located with. That means that the
- * respective vertex must be a (transitive) input of this vertex.
- *
- * @param strictlyCoLocatedWith The vertex whose subtasks to co-locate this vertex's subtasks with.
- *
- * @throws IllegalArgumentException Thrown, if this vertex and the vertex to co-locate with are not in a common
- * slot sharing group.
- *
- * @see #setSlotSharingGroup(SlotSharingGroup)
- */
- public void setStrictlyCoLocatedWith(AbstractJobVertex strictlyCoLocatedWith) {
- if (this.slotSharingGroup == null || this.slotSharingGroup != strictlyCoLocatedWith.slotSharingGroup) {
- throw new IllegalArgumentException("Strict co-location requires that both vertices are in the same slot sharing group.");
- }
-
- CoLocationGroup thisGroup = this.coLocationGroup;
- CoLocationGroup otherGroup = strictlyCoLocatedWith.coLocationGroup;
-
- if (otherGroup == null) {
- if (thisGroup == null) {
- CoLocationGroup group = new CoLocationGroup(this, strictlyCoLocatedWith);
- this.coLocationGroup = group;
- strictlyCoLocatedWith.coLocationGroup = group;
- }
- else {
- thisGroup.addVertex(strictlyCoLocatedWith);
- strictlyCoLocatedWith.coLocationGroup = thisGroup;
- }
- }
- else {
- if (thisGroup == null) {
- otherGroup.addVertex(this);
- this.coLocationGroup = otherGroup;
- }
- else {
- // both had yet distinct groups, we need to merge them
- thisGroup.mergeInto(otherGroup);
- }
- }
- }
-
- public CoLocationGroup getCoLocationGroup() {
- return coLocationGroup;
- }
-
- public void updateCoLocationGroup(CoLocationGroup group) {
- this.coLocationGroup = group;
- }
-
- // --------------------------------------------------------------------------------------------
-
- public IntermediateDataSet createAndAddResultDataSet(ResultPartitionType partitionType) {
- return createAndAddResultDataSet(new IntermediateDataSetID(), partitionType);
- }
-
- public IntermediateDataSet createAndAddResultDataSet(
- IntermediateDataSetID id,
- ResultPartitionType partitionType) {
-
- IntermediateDataSet result = new IntermediateDataSet(id, partitionType, this);
- this.results.add(result);
- return result;
- }
-
- public void connectDataSetAsInput(IntermediateDataSet dataSet, DistributionPattern distPattern) {
- JobEdge edge = new JobEdge(dataSet, this, distPattern);
- this.inputs.add(edge);
- dataSet.addConsumer(edge);
- }
-
- public void connectNewDataSetAsInput(AbstractJobVertex input, DistributionPattern distPattern) {
- connectNewDataSetAsInput(input, distPattern, ResultPartitionType.PIPELINED);
- }
-
- public void connectNewDataSetAsInput(
- AbstractJobVertex input,
- DistributionPattern distPattern,
- ResultPartitionType partitionType) {
-
- IntermediateDataSet dataSet = input.createAndAddResultDataSet(partitionType);
- JobEdge edge = new JobEdge(dataSet, this, distPattern);
- this.inputs.add(edge);
- dataSet.addConsumer(edge);
- }
-
- public void connectIdInput(IntermediateDataSetID dataSetId, DistributionPattern distPattern) {
- JobEdge edge = new JobEdge(dataSetId, this, distPattern);
- this.inputs.add(edge);
- }
-
- // --------------------------------------------------------------------------------------------
-
- public boolean isInputVertex() {
- return this.inputs.isEmpty();
- }
-
- public boolean isOutputVertex() {
- return this.results.isEmpty();
- }
-
- public boolean hasNoConnectedInputs() {
- for (JobEdge edge : inputs) {
- if (!edge.isIdReference()) {
- return false;
- }
- }
-
- return true;
- }
-
- // --------------------------------------------------------------------------------------------
-
- /**
- * A hook that can be overwritten by sub classes to implement logic that is called by the
- * master when the job starts.
- *
- * @param loader The class loader for user defined code.
- * @throws Exception The method may throw exceptions which cause the job to fail immediately.
- */
- public void initializeOnMaster(ClassLoader loader) throws Exception {}
-
- /**
- * A hook that can be overwritten by sub classes to implement logic that is called by the
- * master after the job completed.
- *
- * @param loader The class loader for user defined code.
- * @throws Exception The method may throw exceptions which cause the job to fail immediately.
- */
- public void finalizeOnMaster(ClassLoader loader) throws Exception {}
-
- // --------------------------------------------------------------------------------------------
-
- @Override
- public String toString() {
- return this.name + " (" + this.invokableClassName + ')';
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/1bd0af73/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java
index 0ea0da7..011850c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.operators.util.UserCodeWrapper;
import org.apache.flink.runtime.operators.util.TaskConfig;
-public class InputFormatVertex extends AbstractJobVertex {
+public class InputFormatVertex extends JobVertex {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/flink/blob/1bd0af73/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java
index 86888e2..7c8f32b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java
@@ -39,7 +39,7 @@ public class IntermediateDataSet implements java.io.Serializable {
private final IntermediateDataSetID id; // the identifier
- private final AbstractJobVertex producer; // the operation that produced this data set
+ private final JobVertex producer; // the operation that produced this data set
private final List<JobEdge> consumers = new ArrayList<JobEdge>();
@@ -48,15 +48,15 @@ public class IntermediateDataSet implements java.io.Serializable {
// --------------------------------------------------------------------------------------------
- public IntermediateDataSet(AbstractJobVertex producer) {
+ public IntermediateDataSet(JobVertex producer) {
this(new IntermediateDataSetID(), producer);
}
- public IntermediateDataSet(IntermediateDataSetID id, AbstractJobVertex producer) {
+ public IntermediateDataSet(IntermediateDataSetID id, JobVertex producer) {
this(id, ResultPartitionType.PIPELINED, producer);
}
- public IntermediateDataSet(IntermediateDataSetID id, ResultPartitionType resultType, AbstractJobVertex producer) {
+ public IntermediateDataSet(IntermediateDataSetID id, ResultPartitionType resultType, JobVertex producer) {
this.id = checkNotNull(id);
this.producer = checkNotNull(producer);
this.resultType = checkNotNull(resultType);
@@ -68,7 +68,7 @@ public class IntermediateDataSet implements java.io.Serializable {
return id;
}
- public AbstractJobVertex getProducer() {
+ public JobVertex getProducer() {
return producer;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1bd0af73/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobEdge.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobEdge.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobEdge.java
index 939f6c4..5faea83 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobEdge.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobEdge.java
@@ -29,7 +29,7 @@ public class JobEdge implements java.io.Serializable {
/** The vertex connected to this edge. */
- private final AbstractJobVertex target;
+ private final JobVertex target;
/** The distribution pattern that should be used for this job edge. */
private final DistributionPattern distributionPattern;
@@ -47,7 +47,7 @@ public class JobEdge implements java.io.Serializable {
* @param target The operation that is at the target of this edge.
* @param distributionPattern The pattern that defines how the connection behaves in parallel.
*/
- public JobEdge(IntermediateDataSet source, AbstractJobVertex target, DistributionPattern distributionPattern) {
+ public JobEdge(IntermediateDataSet source, JobVertex target, DistributionPattern distributionPattern) {
if (source == null || target == null || distributionPattern == null) {
throw new NullPointerException();
}
@@ -65,7 +65,7 @@ public class JobEdge implements java.io.Serializable {
* @param target The operation that is at the target of this edge.
* @param distributionPattern The pattern that defines how the connection behaves in parallel.
*/
- public JobEdge(IntermediateDataSetID sourceId, AbstractJobVertex target, DistributionPattern distributionPattern) {
+ public JobEdge(IntermediateDataSetID sourceId, JobVertex target, DistributionPattern distributionPattern) {
if (sourceId == null || target == null || distributionPattern == null) {
throw new NullPointerException();
}
@@ -90,7 +90,7 @@ public class JobEdge implements java.io.Serializable {
*
* @return The vertex connected to this edge.
*/
- public AbstractJobVertex getTarget() {
+ public JobVertex getTarget() {
return target;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1bd0af73/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index 28fa78e..09b415b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -60,7 +60,7 @@ public class JobGraph implements Serializable {
// --------------------------------------------------------------------------------------------
/** List of task vertices included in this job graph. */
- private final Map<JobVertexID, AbstractJobVertex> taskVertices = new LinkedHashMap<JobVertexID, AbstractJobVertex>();
+ private final Map<JobVertexID, JobVertex> taskVertices = new LinkedHashMap<JobVertexID, JobVertex>();
/** The job configuration attached to this job. */
private final Configuration jobConfiguration = new Configuration();
@@ -124,7 +124,7 @@ public class JobGraph implements Serializable {
*
* @param vertices The vertices to add to the graph.
*/
- public JobGraph(AbstractJobVertex... vertices) {
+ public JobGraph(JobVertex... vertices) {
this(null, vertices);
}
@@ -134,7 +134,7 @@ public class JobGraph implements Serializable {
* @param jobName The name of the job.
* @param vertices The vertices to add to the graph.
*/
- public JobGraph(String jobName, AbstractJobVertex... vertices) {
+ public JobGraph(String jobName, JobVertex... vertices) {
this(null, jobName, vertices);
}
@@ -145,10 +145,10 @@ public class JobGraph implements Serializable {
* @param jobName The name of the job.
* @param vertices The vertices to add to the graph.
*/
- public JobGraph(JobID jobId, String jobName, AbstractJobVertex... vertices) {
+ public JobGraph(JobID jobId, String jobName, JobVertex... vertices) {
this(jobId, jobName);
- for (AbstractJobVertex vertex : vertices) {
+ for (JobVertex vertex : vertices) {
addVertex(vertex);
}
}
@@ -229,9 +229,9 @@ public class JobGraph implements Serializable {
* @param vertex
* the new task vertex to be added
*/
- public void addVertex(AbstractJobVertex vertex) {
+ public void addVertex(JobVertex vertex) {
final JobVertexID id = vertex.getID();
- AbstractJobVertex previous = taskVertices.put(id, vertex);
+ JobVertex previous = taskVertices.put(id, vertex);
// if we had a prior association, restore and throw an exception
if (previous != null) {
@@ -245,7 +245,7 @@ public class JobGraph implements Serializable {
*
* @return an Iterable to iterate all vertices registered with the job graph
*/
- public Iterable<AbstractJobVertex> getVertices() {
+ public Iterable<JobVertex> getVertices() {
return this.taskVertices.values();
}
@@ -255,8 +255,8 @@ public class JobGraph implements Serializable {
*
* @return an array of all job vertices that are registered with the job graph
*/
- public AbstractJobVertex[] getVerticesAsArray() {
- return this.taskVertices.values().toArray(new AbstractJobVertex[this.taskVertices.size()]);
+ public JobVertex[] getVerticesAsArray() {
+ return this.taskVertices.values().toArray(new JobVertex[this.taskVertices.size()]);
}
/**
@@ -295,27 +295,27 @@ public class JobGraph implements Serializable {
* the ID of the vertex to search for
* @return the vertex with the matching ID or <code>null</code> if no vertex with such ID could be found
*/
- public AbstractJobVertex findVertexByID(JobVertexID id) {
+ public JobVertex findVertexByID(JobVertexID id) {
return this.taskVertices.get(id);
}
// --------------------------------------------------------------------------------------------
- public List<AbstractJobVertex> getVerticesSortedTopologicallyFromSources() throws InvalidProgramException {
+ public List<JobVertex> getVerticesSortedTopologicallyFromSources() throws InvalidProgramException {
// early out on empty lists
if (this.taskVertices.isEmpty()) {
return Collections.emptyList();
}
- List<AbstractJobVertex> sorted = new ArrayList<AbstractJobVertex>(this.taskVertices.size());
- Set<AbstractJobVertex> remaining = new LinkedHashSet<AbstractJobVertex>(this.taskVertices.values());
+ List<JobVertex> sorted = new ArrayList<JobVertex>(this.taskVertices.size());
+ Set<JobVertex> remaining = new LinkedHashSet<JobVertex>(this.taskVertices.values());
// start by finding the vertices with no input edges
// and the ones with disconnected inputs (that refer to some standalone data set)
{
- Iterator<AbstractJobVertex> iter = remaining.iterator();
+ Iterator<JobVertex> iter = remaining.iterator();
while (iter.hasNext()) {
- AbstractJobVertex vertex = iter.next();
+ JobVertex vertex = iter.next();
if (vertex.hasNoConnectedInputs()) {
sorted.add(vertex);
@@ -335,21 +335,21 @@ public class JobGraph implements Serializable {
throw new InvalidProgramException("The job graph is cyclic.");
}
- AbstractJobVertex current = sorted.get(startNodePos++);
+ JobVertex current = sorted.get(startNodePos++);
addNodesThatHaveNoNewPredecessors(current, sorted, remaining);
}
return sorted;
}
- private void addNodesThatHaveNoNewPredecessors(AbstractJobVertex start, List<AbstractJobVertex> target, Set<AbstractJobVertex> remaining) {
+ private void addNodesThatHaveNoNewPredecessors(JobVertex start, List<JobVertex> target, Set<JobVertex> remaining) {
// forward traverse over all produced data sets and all their consumers
for (IntermediateDataSet dataSet : start.getProducedDataSets()) {
for (JobEdge edge : dataSet.getConsumers()) {
// a vertex can be added, if it has no predecessors that are still in the 'remaining' set
- AbstractJobVertex v = edge.getTarget();
+ JobVertex v = edge.getTarget();
if (!remaining.contains(v)) {
continue;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1bd0af73/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
new file mode 100644
index 0000000..4bf9fc4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobgraph;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputSplitSource;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * The base class for job vertexes.
+ */
+public class JobVertex implements java.io.Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final String DEFAULT_NAME = "(unnamed vertex)";
+
+
+ // --------------------------------------------------------------------------------------------
+ // Members that define the structure / topology of the graph
+ // --------------------------------------------------------------------------------------------
+
+ /** The ID of the vertex. */
+ private final JobVertexID id;
+
+ /** List of produced data sets, one per writer */
+ private final ArrayList<IntermediateDataSet> results = new ArrayList<IntermediateDataSet>();
+
+ /** List of edges with incoming data. One per Reader. */
+ private final ArrayList<JobEdge> inputs = new ArrayList<JobEdge>();
+
+ /** Number of subtasks to split this task into at runtime.*/
+ private int parallelism = -1;
+
+ /** Custom configuration passed to the assigned task at runtime. */
+ private Configuration configuration;
+
+ /** The class of the invokable. */
+ private String invokableClassName;
+
+ /** Optionally, a source of input splits */
+ private InputSplitSource<?> inputSplitSource;
+
+ /** The name of the vertex */
+ private String name;
+
+ /** Optionally, a sharing group that allows subtasks from different job vertices to run concurrently in one slot */
+ private SlotSharingGroup slotSharingGroup;
+
+ /** The group inside which the vertex subtasks share slots */
+ private CoLocationGroup coLocationGroup;
+
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Constructs a new job vertex and assigns it with the given name.
+ *
+ * @param name The name of the new job vertex.
+ */
+ public JobVertex(String name) {
+ this(name, null);
+ }
+
+ /**
+ * Constructs a new job vertex and assigns it with the given name.
+ *
+ * @param name The name of the new job vertex.
+ * @param id The id of the job vertex.
+ */
+ public JobVertex(String name, JobVertexID id) {
+ this.name = name == null ? DEFAULT_NAME : name;
+ this.id = id == null ? new JobVertexID() : id;
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Returns the ID of this job vertex.
+ *
+ * @return The ID of this job vertex
+ */
+ public JobVertexID getID() {
+ return this.id;
+ }
+
+ /**
+ * Returns the name of the vertex.
+ *
+ * @return The name of the vertex.
+ */
+ public String getName() {
+ return this.name;
+ }
+
+ /**
+ * Sets the name of the vertex
+ *
+ * @param name The new name.
+ */
+ public void setName(String name) {
+ this.name = name == null ? DEFAULT_NAME : name;
+ }
+
+ /**
+ * Returns the number of produced intermediate data sets.
+ *
+ * @return The number of produced intermediate data sets.
+ */
+ public int getNumberOfProducedIntermediateDataSets() {
+ return this.results.size();
+ }
+
+ /**
+ * Returns the number of inputs.
+ *
+ * @return The number of inputs.
+ */
+ public int getNumberOfInputs() {
+ return this.inputs.size();
+ }
+
+ /**
+ * Returns the vertex's configuration object which can be used to pass custom settings to the task at runtime.
+ *
+ * @return the vertex's configuration object
+ */
+ public Configuration getConfiguration() {
+ if (this.configuration == null) {
+ this.configuration = new Configuration();
+ }
+ return this.configuration;
+ }
+
+ public void setInvokableClass(Class<? extends AbstractInvokable> invokable) {
+ Preconditions.checkNotNull(invokable);
+ this.invokableClassName = invokable.getName();
+ }
+
+ /**
+ * Returns the name of the invokable class which represents the task of this vertex.
+ *
+ * @return The name of the invokable class, <code>null</code> if not set.
+ */
+ public String getInvokableClassName() {
+ return this.invokableClassName;
+ }
+
+ /**
+ * Returns the invokable class which represents the task of this vertex
+ *
+ * @param cl The classloader used to resolve user-defined classes
+ * @return The invokable class, <code>null</code> if it is not set
+ */
+ public Class<? extends AbstractInvokable> getInvokableClass(ClassLoader cl) {
+ if (cl == null) {
+ throw new NullPointerException("The classloader must not be null.");
+ }
+ if (invokableClassName == null) {
+ return null;
+ }
+
+ try {
+ return Class.forName(invokableClassName, true, cl).asSubclass(AbstractInvokable.class);
+ }
+ catch (ClassNotFoundException e) {
+ throw new RuntimeException("The user-code class could not be resolved.", e);
+ }
+ catch (ClassCastException e) {
+ throw new RuntimeException("The user-code class is no subclass of " + AbstractInvokable.class.getName(), e);
+ }
+ }
+
+ /**
+ * Gets the parallelism of the task.
+ *
+ * @return The parallelism of the task.
+ */
+ public int getParallelism() {
+ return parallelism;
+ }
+
+ /**
+ * Sets the parallelism for the task.
+ *
+ * @param parallelism The parallelism for the task.
+ */
+ public void setParallelism(int parallelism) {
+ if (parallelism < 1) {
+ throw new IllegalArgumentException("The parallelism must be at least one.");
+ }
+ this.parallelism = parallelism;
+ }
+
+ public InputSplitSource<?> getInputSplitSource() {
+ return inputSplitSource;
+ }
+
+ public void setInputSplitSource(InputSplitSource<?> inputSplitSource) {
+ this.inputSplitSource = inputSplitSource;
+ }
+
+ public List<IntermediateDataSet> getProducedDataSets() {
+ return this.results;
+ }
+
+ public List<JobEdge> getInputs() {
+ return this.inputs;
+ }
+
+ /**
+ * Associates this vertex with a slot sharing group for scheduling. Different vertices in the same
+ * slot sharing group can run one subtask each in the same slot.
+ *
+ * @param grp The slot sharing group to associate the vertex with.
+ */
+ public void setSlotSharingGroup(SlotSharingGroup grp) {
+ if (this.slotSharingGroup != null) {
+ this.slotSharingGroup.removeVertexFromGroup(id);
+ }
+
+ this.slotSharingGroup = grp;
+ if (grp != null) {
+ grp.addVertexToGroup(id);
+ }
+ }
+
+ /**
+ * Gets the slot sharing group that this vertex is associated with. Different vertices in the same
+ * slot sharing group can run one subtask each in the same slot. If the vertex is not associated with
+ * a slot sharing group, this method returns {@code null}.
+ *
+ * @return The slot sharing group to associate the vertex with, or {@code null}, if not associated with one.
+ */
+ public SlotSharingGroup getSlotSharingGroup() {
+ return slotSharingGroup;
+ }
+
+ /**
+ * Tells this vertex to strictly co locate its subtasks with the subtasks of the given vertex.
+ * Strict co-location implies that the n'th subtask of this vertex will run on the same parallel computing
+ * instance (TaskManager) as the n'th subtask of the given vertex.
+ *
+ * NOTE: Co-location is only possible between vertices in a slot sharing group.
+ *
+ * NOTE: This vertex must (transitively) depend on the vertex to be co-located with. That means that the
+ * respective vertex must be a (transitive) input of this vertex.
+ *
+ * @param strictlyCoLocatedWith The vertex whose subtasks to co-locate this vertex's subtasks with.
+ *
+ * @throws IllegalArgumentException Thrown, if this vertex and the vertex to co-locate with are not in a common
+ * slot sharing group.
+ *
+ * @see #setSlotSharingGroup(SlotSharingGroup)
+ */
+ public void setStrictlyCoLocatedWith(JobVertex strictlyCoLocatedWith) {
+ if (this.slotSharingGroup == null || this.slotSharingGroup != strictlyCoLocatedWith.slotSharingGroup) {
+ throw new IllegalArgumentException("Strict co-location requires that both vertices are in the same slot sharing group.");
+ }
+
+ CoLocationGroup thisGroup = this.coLocationGroup;
+ CoLocationGroup otherGroup = strictlyCoLocatedWith.coLocationGroup;
+
+ if (otherGroup == null) {
+ if (thisGroup == null) {
+ CoLocationGroup group = new CoLocationGroup(this, strictlyCoLocatedWith);
+ this.coLocationGroup = group;
+ strictlyCoLocatedWith.coLocationGroup = group;
+ }
+ else {
+ thisGroup.addVertex(strictlyCoLocatedWith);
+ strictlyCoLocatedWith.coLocationGroup = thisGroup;
+ }
+ }
+ else {
+ if (thisGroup == null) {
+ otherGroup.addVertex(this);
+ this.coLocationGroup = otherGroup;
+ }
+ else {
+ // both had yet distinct groups, we need to merge them
+ thisGroup.mergeInto(otherGroup);
+ }
+ }
+ }
+
+ public CoLocationGroup getCoLocationGroup() {
+ return coLocationGroup;
+ }
+
+ public void updateCoLocationGroup(CoLocationGroup group) {
+ this.coLocationGroup = group;
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ public IntermediateDataSet createAndAddResultDataSet(ResultPartitionType partitionType) {
+ return createAndAddResultDataSet(new IntermediateDataSetID(), partitionType);
+ }
+
+ public IntermediateDataSet createAndAddResultDataSet(
+ IntermediateDataSetID id,
+ ResultPartitionType partitionType) {
+
+ IntermediateDataSet result = new IntermediateDataSet(id, partitionType, this);
+ this.results.add(result);
+ return result;
+ }
+
+ public void connectDataSetAsInput(IntermediateDataSet dataSet, DistributionPattern distPattern) {
+ JobEdge edge = new JobEdge(dataSet, this, distPattern);
+ this.inputs.add(edge);
+ dataSet.addConsumer(edge);
+ }
+
+ public void connectNewDataSetAsInput(JobVertex input, DistributionPattern distPattern) {
+ connectNewDataSetAsInput(input, distPattern, ResultPartitionType.PIPELINED);
+ }
+
+ public void connectNewDataSetAsInput(
+ JobVertex input,
+ DistributionPattern distPattern,
+ ResultPartitionType partitionType) {
+
+ IntermediateDataSet dataSet = input.createAndAddResultDataSet(partitionType);
+ JobEdge edge = new JobEdge(dataSet, this, distPattern);
+ this.inputs.add(edge);
+ dataSet.addConsumer(edge);
+ }
+
+ public void connectIdInput(IntermediateDataSetID dataSetId, DistributionPattern distPattern) {
+ JobEdge edge = new JobEdge(dataSetId, this, distPattern);
+ this.inputs.add(edge);
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ public boolean isInputVertex() {
+ return this.inputs.isEmpty();
+ }
+
+ public boolean isOutputVertex() {
+ return this.results.isEmpty();
+ }
+
+ public boolean hasNoConnectedInputs() {
+ for (JobEdge edge : inputs) {
+ if (!edge.isIdReference()) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * A hook that can be overwritten by sub classes to implement logic that is called by the
+ * master when the job starts.
+ *
+ * @param loader The class loader for user defined code.
+ * @throws Exception The method may throw exceptions which cause the job to fail immediately.
+ */
+ public void initializeOnMaster(ClassLoader loader) throws Exception {}
+
+ /**
+ * A hook that can be overwritten by sub classes to implement logic that is called by the
+ * master after the job completed.
+ *
+ * @param loader The class loader for user defined code.
+ * @throws Exception The method may throw exceptions which cause the job to fail immediately.
+ */
+ public void finalizeOnMaster(ClassLoader loader) throws Exception {}
+
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public String toString() {
+ return this.name + " (" + this.invokableClassName + ')';
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1bd0af73/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatVertex.java
index 2a1f89c..c9ac564 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatVertex.java
@@ -28,7 +28,7 @@ import org.apache.flink.runtime.operators.util.TaskConfig;
* A task vertex that run an initialization on the master, trying to deserialize an output format
* and initializing it on master, if necessary.
*/
-public class OutputFormatVertex extends AbstractJobVertex {
+public class OutputFormatVertex extends JobVertex {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/flink/blob/1bd0af73/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
index a70cd2b..85dd5c5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
@@ -108,9 +108,9 @@ public abstract class AbstractInvokable {
}
/**
- * Returns the task configuration object which was attached to the original {@link org.apache.flink.runtime.jobgraph.AbstractJobVertex}.
+ * Returns the task configuration object which was attached to the original {@link org.apache.flink.runtime.jobgraph.JobVertex}.
*
- * @return the task configuration object which was attached to the original {@link org.apache.flink.runtime.jobgraph.AbstractJobVertex}
+ * @return the task configuration object which was attached to the original {@link org.apache.flink.runtime.jobgraph.JobVertex}
*/
public Configuration getTaskConfiguration() {
return this.environment.getTaskConfiguration();
http://git-wip-us.apache.org/repos/asf/flink/blob/1bd0af73/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroup.java
index a88c89d..74e34e1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroup.java
@@ -22,7 +22,7 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.flink.util.AbstractID;
-import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertex;
import com.google.common.base.Preconditions;
@@ -43,7 +43,7 @@ public class CoLocationGroup implements java.io.Serializable {
private final AbstractID id = new AbstractID();
/** The vertices participating in the co-location group */
- private final List<AbstractJobVertex> vertices = new ArrayList<AbstractJobVertex>();
+ private final List<JobVertex> vertices = new ArrayList<JobVertex>();
/** The constraints, which hold the shared slots for the co-located operators */
private transient ArrayList<CoLocationConstraint> constraints;
@@ -52,15 +52,15 @@ public class CoLocationGroup implements java.io.Serializable {
public CoLocationGroup() {}
- public CoLocationGroup(AbstractJobVertex... vertices) {
- for (AbstractJobVertex v : vertices) {
+ public CoLocationGroup(JobVertex... vertices) {
+ for (JobVertex v : vertices) {
this.vertices.add(v);
}
}
// --------------------------------------------------------------------------------------------
- public void addVertex(AbstractJobVertex vertex) {
+ public void addVertex(JobVertex vertex) {
Preconditions.checkNotNull(vertex);
this.vertices.add(vertex);
}
@@ -68,7 +68,7 @@ public class CoLocationGroup implements java.io.Serializable {
public void mergeInto(CoLocationGroup other) {
Preconditions.checkNotNull(other);
- for (AbstractJobVertex v : this.vertices) {
+ for (JobVertex v : this.vertices) {
v.updateCoLocationGroup(other);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1bd0af73/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
index 3d9a155..932e366 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
@@ -24,7 +24,7 @@ import akka.pattern.Patterns;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
@@ -55,7 +55,7 @@ public class CoordinatorShutdownTest {
cluster = new LocalFlinkMiniCluster(noTaskManagerConfig, true);
// build a test graph with snapshotting enabled
- AbstractJobVertex vertex = new AbstractJobVertex("Test Vertex");
+ JobVertex vertex = new JobVertex("Test Vertex");
vertex.setInvokableClass(Tasks.NoOpInvokable.class);
List<JobVertexID> vertexIdList = Collections.singletonList(vertex.getID());
@@ -102,7 +102,7 @@ public class CoordinatorShutdownTest {
cluster = new LocalFlinkMiniCluster(new Configuration(), true);
// build a test graph with snapshotting enabled
- AbstractJobVertex vertex = new AbstractJobVertex("Test Vertex");
+ JobVertex vertex = new JobVertex("Test Vertex");
vertex.setInvokableClass(Tasks.NoOpInvokable.class);
List<JobVertexID> vertexIdList = Collections.singletonList(vertex.getID());
http://git-wip-us.apache.org/repos/asf/flink/blob/1bd0af73/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
index 4d10585..693e014 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.executiongraph;
import java.util.Arrays;
import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertex;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
@@ -32,10 +32,10 @@ public class AllVerticesIteratorTest {
public void testAllVertices() {
try {
- AbstractJobVertex v1 = new AbstractJobVertex("v1");
- AbstractJobVertex v2 = new AbstractJobVertex("v2");
- AbstractJobVertex v3 = new AbstractJobVertex("v3");
- AbstractJobVertex v4 = new AbstractJobVertex("v4");
+ JobVertex v1 = new JobVertex("v1");
+ JobVertex v2 = new JobVertex("v2");
+ JobVertex v3 = new JobVertex("v3");
+ JobVertex v4 = new JobVertex("v4");
v1.setParallelism(1);
v2.setParallelism(7);