You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2015/06/16 17:20:10 UTC

[3/3] flink git commit: [FLINK-2120][runtime] rename AbstractJobVertex to JobVertex

[FLINK-2120][runtime] rename AbstractJobVertex to JobVertex

This closes #840.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1bd0af73
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1bd0af73
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1bd0af73

Branch: refs/heads/master
Commit: 1bd0af73892c812e6d16c5661cf10a9e2c13b3e4
Parents: 20d7e5a
Author: mjsax <mj...@informatik.hu-berlin.de>
Authored: Mon Jun 15 22:12:47 2015 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Tue Jun 16 17:18:45 2015 +0200

----------------------------------------------------------------------
 .../client/program/ClientConnectionTest.java    |   4 +-
 .../plantranslate/JobGraphGenerator.java        | 102 ++---
 .../plantranslate/TempInIterationsTest.java     |   4 +-
 .../runtime/executiongraph/ExecutionGraph.java  |   6 +-
 .../executiongraph/ExecutionJobVertex.java      |  10 +-
 .../runtime/jobgraph/AbstractJobVertex.java     | 405 -------------------
 .../runtime/jobgraph/InputFormatVertex.java     |   2 +-
 .../runtime/jobgraph/IntermediateDataSet.java   |  10 +-
 .../apache/flink/runtime/jobgraph/JobEdge.java  |   8 +-
 .../apache/flink/runtime/jobgraph/JobGraph.java |  38 +-
 .../flink/runtime/jobgraph/JobVertex.java       | 405 +++++++++++++++++++
 .../runtime/jobgraph/OutputFormatVertex.java    |   2 +-
 .../jobgraph/tasks/AbstractInvokable.java       |   4 +-
 .../jobmanager/scheduler/CoLocationGroup.java   |  12 +-
 .../checkpoint/CoordinatorShutdownTest.java     |   6 +-
 .../executiongraph/AllVerticesIteratorTest.java |  10 +-
 .../ExecutionGraphConstructionTest.java         | 102 ++---
 .../ExecutionGraphDeploymentTest.java           |  38 +-
 .../executiongraph/ExecutionGraphTestUtils.java |   4 +-
 .../ExecutionStateProgressTest.java             |   4 +-
 .../executiongraph/LocalInputSplitsTest.java    |   6 +-
 .../executiongraph/PointwisePatternTest.java    |  44 +-
 .../TerminalStateDeadlockTest.java              |   8 +-
 .../VertexLocationConstraintTest.java           |  18 +-
 .../executiongraph/VertexSlotSharingTest.java   |  14 +-
 .../flink/runtime/instance/SharedSlotsTest.java |  12 +-
 .../PartialConsumePipelinedResultTest.java      |   6 +-
 .../flink/runtime/jobgraph/JobGraphTest.java    |  74 ++--
 .../runtime/jobgraph/JobTaskVertexTest.java     |  10 +-
 .../runtime/jobmanager/JobManagerTest.java      |   4 +-
 .../flink/runtime/jobmanager/JobSubmitTest.java |   6 +-
 .../SlotCountExceedingParallelismTest.java      |   6 +-
 .../scheduler/CoLocationConstraintTest.java     |   8 +-
 .../ScheduleOrUpdateConsumersTest.java          |   8 +-
 .../runtime/taskmanager/TaskCancelTest.java     |  10 +-
 .../ExecutionGraphRestartTest.scala             |   6 +-
 .../TaskManagerLossFailsTasksTest.scala         |   4 +-
 .../jobmanager/CoLocationConstraintITCase.scala |   7 +-
 .../runtime/jobmanager/JobManagerITCase.scala   |  59 ++-
 .../runtime/jobmanager/RecoveryITCase.scala     |  16 +-
 .../runtime/jobmanager/SlotSharingITCase.scala  |  12 +-
 .../TaskManagerFailsWithSlotSharingITCase.scala |  10 +-
 .../api/graph/StreamingJobGraphGenerator.java   |  20 +-
 .../apache/flink/streaming/api/IterateTest.java |  12 +-
 .../streaming/api/graph/SlotAllocationTest.java |   4 +-
 .../BroadcastVarsNepheleITCase.java             |   8 +-
 .../KMeansIterativeNepheleITCase.java           |  26 +-
 .../JobSubmissionFailsITCase.java               |   8 +-
 .../ConnectedComponentsNepheleITCase.java       |  58 +--
 .../IterationWithChainingNepheleITCase.java     |   8 +-
 .../test/iterative/nephele/JobGraphUtils.java   |  12 +-
 .../CustomCompensatableDanglingPageRank.java    |  10 +-
 ...mpensatableDanglingPageRankWithCombiner.java |  10 +-
 .../CompensatableDanglingPageRank.java          |  10 +-
 .../runtime/NetworkStackThroughputITCase.java   |  10 +-
 .../jobmanager/JobManagerFailsITCase.scala      |   6 +-
 .../taskmanager/TaskManagerFailsITCase.scala    |  14 +-
 57 files changed, 869 insertions(+), 871 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1bd0af73/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
index be6c19a..39b74a3 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.client.program;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.net.NetUtils;
@@ -87,7 +87,7 @@ public class ClientConnectionTest {
 		config.setString(ConfigConstants.AKKA_LOOKUP_TIMEOUT, (CONNECT_TIMEOUT/1000) + " s");
 
 		try {
-			AbstractJobVertex vertex = new AbstractJobVertex("Test Vertex");
+			JobVertex vertex = new JobVertex("Test Vertex");
 			vertex.setInvokableClass(TestInvokable.class);
 
 			final JobGraph jg = new JobGraph("Test Job", vertex);

http://git-wip-us.apache.org/repos/asf/flink/blob/1bd0af73/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
index 2630019..109be20 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
@@ -55,7 +55,7 @@ import org.apache.flink.runtime.iterative.task.IterationHeadPactTask;
 import org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask;
 import org.apache.flink.runtime.iterative.task.IterationSynchronizationSinkTask;
 import org.apache.flink.runtime.iterative.task.IterationTailPactTask;
-import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.InputFormatVertex;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -110,7 +110,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 	
 	// ------------------------------------------------------------------------
 
-	private Map<PlanNode, AbstractJobVertex> vertices; // a map from optimizer nodes to job vertices
+	private Map<PlanNode, JobVertex> vertices; // a map from optimizer nodes to job vertices
 	
 	private Map<PlanNode, TaskInChain> chainedTasks; // a map from optimizer nodes to job vertices
 	
@@ -118,7 +118,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 	
 	private List<TaskInChain> chainedTasksInSequence;
 	
-	private List<AbstractJobVertex> auxVertices; // auxiliary vertices which are added during job graph generation
+	private List<JobVertex> auxVertices; // auxiliary vertices which are added during job graph generation
 	
 	private final int defaultMaxFan;
 	
@@ -157,10 +157,10 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 	 * @return JobGraph generated frmo the plan.
 	 */
 	public JobGraph compileJobGraph(OptimizedPlan program) {
-		this.vertices = new HashMap<PlanNode, AbstractJobVertex>();
+		this.vertices = new HashMap<PlanNode, JobVertex>();
 		this.chainedTasks = new HashMap<PlanNode, TaskInChain>();
 		this.chainedTasksInSequence = new ArrayList<TaskInChain>();
-		this.auxVertices = new ArrayList<AbstractJobVertex>();
+		this.auxVertices = new ArrayList<JobVertex>();
 		this.iterations = new HashMap<IterationPlanNode, IterationDescriptor>();
 		this.iterationStack = new ArrayList<IterationPlanNode>();
 		
@@ -198,11 +198,11 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 		graph.setAllowQueuedScheduling(false);
 		
 		// add vertices to the graph
-		for (AbstractJobVertex vertex : this.vertices.values()) {
+		for (JobVertex vertex : this.vertices.values()) {
 			graph.addVertex(vertex);
 		}
 		
-		for (AbstractJobVertex vertex : this.auxVertices) {
+		for (JobVertex vertex : this.auxVertices) {
 			graph.addVertex(vertex);
 			vertex.setSlotSharingGroup(sharingGroup);
 		}
@@ -251,7 +251,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 		}
 
 		// the vertex to be created for the current node
-		final AbstractJobVertex vertex;
+		final JobVertex vertex;
 		try {
 			if (node instanceof SinkPlanNode) {
 				vertex = createDataSinkVertex((SinkPlanNode) node);
@@ -322,7 +322,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 				// we adjust the joins / cogroups that go into the solution set here
 				for (Channel c : node.getOutgoingChannels()) {
 					DualInputPlanNode target = (DualInputPlanNode) c.getTarget();
-					AbstractJobVertex accessingVertex = this.vertices.get(target);
+					JobVertex accessingVertex = this.vertices.get(target);
 					TaskConfig conf = new TaskConfig(accessingVertex.getConfiguration());
 					int inputNum = c == target.getInput1() ? 0 : c == target.getInput2() ? 1 : -1;
 					
@@ -438,7 +438,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 				if (node instanceof WorksetIterationPlanNode) {
 					// connect the initial solution set
 					WorksetIterationPlanNode wsNode = (WorksetIterationPlanNode) node;
-					AbstractJobVertex headVertex = this.iterations.get(wsNode).getHeadTask();
+					JobVertex headVertex = this.iterations.get(wsNode).getHeadTask();
 					TaskConfig headConfig = new TaskConfig(headVertex.getConfiguration());
 					int inputIndex = headConfig.getDriverStrategy().getNumInputs();
 					headConfig.setIterationHeadSolutionSetInputIndex(inputIndex);
@@ -448,7 +448,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 				return;
 			}
 			
-			final AbstractJobVertex targetVertex = this.vertices.get(node);
+			final JobVertex targetVertex = this.vertices.get(node);
 			
 			
 			// --------- Main Path: Translation of channels ----------
@@ -480,7 +480,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 						throw new CompilerException("Bug: Found a chained task with an input ship strategy other than FORWARD.");
 					}
 	
-					AbstractJobVertex container = chainedTask.getContainingVertex();
+					JobVertex container = chainedTask.getContainingVertex();
 					
 					if (container == null) {
 						final PlanNode sourceNode = inConn.getSource();
@@ -526,7 +526,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 			
 			
 			if (this.currentIteration != null) {
-				AbstractJobVertex head = this.iterations.get(this.currentIteration).getHeadTask();
+				JobVertex head = this.iterations.get(this.currentIteration).getHeadTask();
 				// the head may still be null if we descend into the static parts first
 				if (head != null) {
 					targetVertex.setStrictlyCoLocatedWith(head);
@@ -580,7 +580,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 		}
 	}
 	
-	private int translateChannel(Channel input, int inputIndex, AbstractJobVertex targetVertex,
+	private int translateChannel(Channel input, int inputIndex, JobVertex targetVertex,
 			TaskConfig targetVertexConfig, boolean isBroadcast) throws Exception
 	{
 		final PlanNode inputPlanNode = input.getSource();
@@ -656,7 +656,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 			}
 			
 			final PlanNode sourceNode = inConn.getSource();
-			AbstractJobVertex sourceVertex = this.vertices.get(sourceNode);
+			JobVertex sourceVertex = this.vertices.get(sourceNode);
 			TaskConfig sourceVertexConfig;
 
 			if (sourceVertex == null) {
@@ -740,7 +740,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 	// Methods for creating individual vertices
 	// ------------------------------------------------------------------------
 	
-	private AbstractJobVertex createSingleInputVertex(SingleInputPlanNode node) throws CompilerException {
+	private JobVertex createSingleInputVertex(SingleInputPlanNode node) throws CompilerException {
 		final String taskName = node.getNodeName();
 		final DriverStrategy ds = node.getDriverStrategy();
 		
@@ -783,7 +783,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 			}
 		}
 		
-		final AbstractJobVertex vertex;
+		final JobVertex vertex;
 		final TaskConfig config;
 		
 		if (chaining) {
@@ -792,7 +792,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 			this.chainedTasks.put(node, new TaskInChain(ds.getPushChainDriverClass(), config, taskName));
 		} else {
 			// create task vertex
-			vertex = new AbstractJobVertex(taskName);
+			vertex = new JobVertex(taskName);
 			vertex.setInvokableClass((this.currentIteration != null && node.isOnDynamicPath()) ? IterationIntermediatePactTask.class : RegularPactTask.class);
 			
 			config = new TaskConfig(vertex.getConfiguration());
@@ -813,10 +813,10 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 		return vertex;
 	}
 
-	private AbstractJobVertex createDualInputVertex(DualInputPlanNode node) throws CompilerException {
+	private JobVertex createDualInputVertex(DualInputPlanNode node) throws CompilerException {
 		final String taskName = node.getNodeName();
 		final DriverStrategy ds = node.getDriverStrategy();
-		final AbstractJobVertex vertex = new AbstractJobVertex(taskName);
+		final JobVertex vertex = new JobVertex(taskName);
 		final TaskConfig config = new TaskConfig(vertex.getConfiguration());
 		vertex.setInvokableClass( (this.currentIteration != null && node.isOnDynamicPath()) ? IterationIntermediatePactTask.class : RegularPactTask.class);
 		
@@ -857,7 +857,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 		return vertex;
 	}
 
-	private AbstractJobVertex createDataSinkVertex(SinkPlanNode node) throws CompilerException {
+	private JobVertex createDataSinkVertex(SinkPlanNode node) throws CompilerException {
 		final OutputFormatVertex vertex = new OutputFormatVertex(node.getNodeName());
 		final TaskConfig config = new TaskConfig(vertex.getConfiguration());
 
@@ -871,7 +871,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 		return vertex;
 	}
 	
-	private AbstractJobVertex createBulkIterationHead(BulkPartialSolutionPlanNode pspn) {
+	private JobVertex createBulkIterationHead(BulkPartialSolutionPlanNode pspn) {
 		// get the bulk iteration that corresponds to this partial solution node
 		final BulkIterationPlanNode iteration = pspn.getContainingIterationNode();
 		
@@ -902,12 +902,12 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 		}
 		
 		// create or adopt the head vertex
-		final AbstractJobVertex toReturn;
-		final AbstractJobVertex headVertex;
+		final JobVertex toReturn;
+		final JobVertex headVertex;
 		final TaskConfig headConfig;
 		if (merge) {
 			final PlanNode successor = pspn.getOutgoingChannels().get(0).getTarget();
-			headVertex = (AbstractJobVertex) this.vertices.get(successor);
+			headVertex = (JobVertex) this.vertices.get(successor);
 			
 			if (headVertex == null) {
 				throw new CompilerException(
@@ -922,7 +922,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 			// instantiate the head vertex and give it a no-op driver as the driver strategy.
 			// everything else happens in the post visit, after the input (the initial partial solution)
 			// is connected.
-			headVertex = new AbstractJobVertex("PartialSolution ("+iteration.getNodeName()+")");
+			headVertex = new JobVertex("PartialSolution ("+iteration.getNodeName()+")");
 			headVertex.setInvokableClass(IterationHeadPactTask.class);
 			headConfig = new TaskConfig(headVertex.getConfiguration());
 			headConfig.setDriver(NoOpDriver.class);
@@ -939,7 +939,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 		return toReturn;
 	}
 	
-	private AbstractJobVertex createWorksetIterationHead(WorksetPlanNode wspn) {
+	private JobVertex createWorksetIterationHead(WorksetPlanNode wspn) {
 		// get the bulk iteration that corresponds to this partial solution node
 		final WorksetIterationPlanNode iteration = wspn.getContainingIterationNode();
 		
@@ -970,12 +970,12 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 		}
 		
 		// create or adopt the head vertex
-		final AbstractJobVertex toReturn;
-		final AbstractJobVertex headVertex;
+		final JobVertex toReturn;
+		final JobVertex headVertex;
 		final TaskConfig headConfig;
 		if (merge) {
 			final PlanNode successor = wspn.getOutgoingChannels().get(0).getTarget();
-			headVertex = (AbstractJobVertex) this.vertices.get(successor);
+			headVertex = (JobVertex) this.vertices.get(successor);
 			
 			if (headVertex == null) {
 				throw new CompilerException(
@@ -990,7 +990,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 			// instantiate the head vertex and give it a no-op driver as the driver strategy.
 			// everything else happens in the post visit, after the input (the initial partial solution)
 			// is connected.
-			headVertex = new AbstractJobVertex("IterationHead("+iteration.getNodeName()+")");
+			headVertex = new JobVertex("IterationHead("+iteration.getNodeName()+")");
 			headVertex.setInvokableClass(IterationHeadPactTask.class);
 			headConfig = new TaskConfig(headVertex.getConfiguration());
 			headConfig.setDriver(NoOpDriver.class);
@@ -1045,8 +1045,8 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 	 * @throws CompilerException
 	 */
 	private DistributionPattern connectJobVertices(Channel channel, int inputNumber,
-			final AbstractJobVertex sourceVertex, final TaskConfig sourceConfig,
-			final AbstractJobVertex targetVertex, final TaskConfig targetConfig, boolean isBroadcast)
+			final JobVertex sourceVertex, final TaskConfig sourceConfig,
+			final JobVertex targetVertex, final TaskConfig targetConfig, boolean isBroadcast)
 	throws CompilerException
 	{
 		// ------------ connect the vertices to the job graph --------------
@@ -1187,7 +1187,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 	private void finalizeBulkIteration(IterationDescriptor descr) {
 		
 		final BulkIterationPlanNode bulkNode = (BulkIterationPlanNode) descr.getIterationNode();
-		final AbstractJobVertex headVertex = descr.getHeadTask();
+		final JobVertex headVertex = descr.getHeadTask();
 		final TaskConfig headConfig = new TaskConfig(headVertex.getConfiguration());
 		final TaskConfig headFinalOutputConfig = descr.getHeadFinalResultConfig();
 		
@@ -1208,7 +1208,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 		headConfig.setRelativeBackChannelMemory(relativeMemForBackChannel);
 		
 		// --------------------------- create the sync task ---------------------------
-		final AbstractJobVertex sync = new AbstractJobVertex("Sync(" + bulkNode.getNodeName() + ")");
+		final JobVertex sync = new JobVertex("Sync(" + bulkNode.getNodeName() + ")");
 		sync.setInvokableClass(IterationSynchronizationSinkTask.class);
 		sync.setParallelism(1);
 		this.auxVertices.add(sync);
@@ -1232,14 +1232,14 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 		final PlanNode rootOfStepFunction = bulkNode.getRootOfStepFunction();
 		final TaskConfig tailConfig;
 		
-		AbstractJobVertex rootOfStepFunctionVertex = (AbstractJobVertex) this.vertices.get(rootOfStepFunction);
+		JobVertex rootOfStepFunctionVertex = (JobVertex) this.vertices.get(rootOfStepFunction);
 		if (rootOfStepFunctionVertex == null) {
 			// last op is chained
 			final TaskInChain taskInChain = this.chainedTasks.get(rootOfStepFunction);
 			if (taskInChain == null) {
 				throw new CompilerException("Bug: Tail of step function not found as vertex or chained task.");
 			}
-			rootOfStepFunctionVertex = (AbstractJobVertex) taskInChain.getContainingVertex();
+			rootOfStepFunctionVertex = (JobVertex) taskInChain.getContainingVertex();
 
 			// the fake channel is statically typed to pact record. no data is sent over this channel anyways.
 			tailConfig = taskInChain.getTaskConfig();
@@ -1262,7 +1262,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 		final TaskConfig tailConfigOfTerminationCriterion;
 		// If we have a termination criterion and it is not an intermediate node
 		if(rootOfTerminationCriterion != null && rootOfTerminationCriterion.getOutgoingChannels().isEmpty()) {
-			AbstractJobVertex rootOfTerminationCriterionVertex = (AbstractJobVertex) this.vertices.get(rootOfTerminationCriterion);
+			JobVertex rootOfTerminationCriterionVertex = (JobVertex) this.vertices.get(rootOfTerminationCriterion);
 			
 			
 			if (rootOfTerminationCriterionVertex == null) {
@@ -1271,7 +1271,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 				if (taskInChain == null) {
 					throw new CompilerException("Bug: Tail of termination criterion not found as vertex or chained task.");
 				}
-				rootOfTerminationCriterionVertex = (AbstractJobVertex) taskInChain.getContainingVertex();
+				rootOfTerminationCriterionVertex = (JobVertex) taskInChain.getContainingVertex();
 
 				// the fake channel is statically typed to pact record. no data is sent over this channel anyways.
 				tailConfigOfTerminationCriterion = taskInChain.getTaskConfig();
@@ -1312,7 +1312,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 	
 	private void finalizeWorksetIteration(IterationDescriptor descr) {
 		final WorksetIterationPlanNode iterNode = (WorksetIterationPlanNode) descr.getIterationNode();
-		final AbstractJobVertex headVertex = descr.getHeadTask();
+		final JobVertex headVertex = descr.getHeadTask();
 		final TaskConfig headConfig = new TaskConfig(headVertex.getConfiguration());
 		final TaskConfig headFinalOutputConfig = descr.getHeadFinalResultConfig();
 		
@@ -1344,7 +1344,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 		// --------------------------- create the sync task ---------------------------
 		final TaskConfig syncConfig;
 		{
-			final AbstractJobVertex sync = new AbstractJobVertex("Sync (" + iterNode.getNodeName() + ")");
+			final JobVertex sync = new JobVertex("Sync (" + iterNode.getNodeName() + ")");
 			sync.setInvokableClass(IterationSynchronizationSinkTask.class);
 			sync.setParallelism(1);
 			this.auxVertices.add(sync);
@@ -1381,14 +1381,14 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 			{
 				// get the vertex for the workset update
 				final TaskConfig worksetTailConfig;
-				AbstractJobVertex nextWorksetVertex = (AbstractJobVertex) this.vertices.get(nextWorksetNode);
+				JobVertex nextWorksetVertex = (JobVertex) this.vertices.get(nextWorksetNode);
 				if (nextWorksetVertex == null) {
 					// nextWorksetVertex is chained
 					TaskInChain taskInChain = this.chainedTasks.get(nextWorksetNode);
 					if (taskInChain == null) {
 						throw new CompilerException("Bug: Next workset node not found as vertex or chained task.");
 					}
-					nextWorksetVertex = (AbstractJobVertex) taskInChain.getContainingVertex();
+					nextWorksetVertex = (JobVertex) taskInChain.getContainingVertex();
 					worksetTailConfig = taskInChain.getTaskConfig();
 				} else {
 					worksetTailConfig = new TaskConfig(nextWorksetVertex.getConfiguration());
@@ -1406,14 +1406,14 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 			}
 			{
 				final TaskConfig solutionDeltaConfig;
-				AbstractJobVertex solutionDeltaVertex = (AbstractJobVertex) this.vertices.get(solutionDeltaNode);
+				JobVertex solutionDeltaVertex = (JobVertex) this.vertices.get(solutionDeltaNode);
 				if (solutionDeltaVertex == null) {
 					// last op is chained
 					TaskInChain taskInChain = this.chainedTasks.get(solutionDeltaNode);
 					if (taskInChain == null) {
 						throw new CompilerException("Bug: Solution Set Delta not found as vertex or chained task.");
 					}
-					solutionDeltaVertex = (AbstractJobVertex) taskInChain.getContainingVertex();
+					solutionDeltaVertex = (JobVertex) taskInChain.getContainingVertex();
 					solutionDeltaConfig = taskInChain.getTaskConfig();
 				} else {
 					solutionDeltaConfig = new TaskConfig(solutionDeltaVertex.getConfiguration());
@@ -1501,7 +1501,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 		
 		private final String taskName;
 		
-		private AbstractJobVertex containingVertex;
+		private JobVertex containingVertex;
 
 		TaskInChain(Class<? extends ChainedDriver<?, ?>> chainedTask, TaskConfig taskConfig,
 					String taskName) {
@@ -1522,11 +1522,11 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 			return this.taskName;
 		}
 		
-		public AbstractJobVertex getContainingVertex() {
+		public JobVertex getContainingVertex() {
 			return this.containingVertex;
 		}
 		
-		public void setContainingVertex(AbstractJobVertex containingVertex) {
+		public void setContainingVertex(JobVertex containingVertex) {
 			this.containingVertex = containingVertex;
 		}
 	}
@@ -1535,7 +1535,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 		
 		private final IterationPlanNode iterationNode;
 		
-		private AbstractJobVertex headTask;
+		private JobVertex headTask;
 		
 		private TaskConfig headConfig;
 		
@@ -1552,7 +1552,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 			return iterationNode;
 		}
 		
-		public void setHeadTask(AbstractJobVertex headTask, TaskConfig headConfig) {
+		public void setHeadTask(JobVertex headTask, TaskConfig headConfig) {
 			this.headTask = headTask;
 			this.headFinalResultConfig = new TaskConfig(new Configuration());
 			
@@ -1564,7 +1564,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 			this.headConfig = headConfig;
 		}
 		
-		public AbstractJobVertex getHeadTask() {
+		public JobVertex getHeadTask() {
 			return headTask;
 		}
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/1bd0af73/flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/TempInIterationsTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/TempInIterationsTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/TempInIterationsTest.java
index 15cb03f..a2a8d42 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/TempInIterationsTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/TempInIterationsTest.java
@@ -28,7 +28,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.testfunctions.DummyFlatJoinFunction;
-import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.junit.Test;
@@ -65,7 +65,7 @@ public class TempInIterationsTest {
 		JobGraph jg = jgg.compileJobGraph(oPlan);
 
 		boolean solutionSetUpdateChecked = false;
-		for(AbstractJobVertex v : jg.getVertices()) {
+		for(JobVertex v : jg.getVertices()) {
 			if(v.getName().equals("SolutionSet Delta")) {
 
 				// check if input of solution set delta is temped

http://git-wip-us.apache.org/repos/asf/flink/blob/1bd0af73/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 17eb3f6..84cbab7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -29,7 +29,7 @@ import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
-import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobStatus;
@@ -455,7 +455,7 @@ public class ExecutionGraph implements Serializable {
 	//  Actions
 	// --------------------------------------------------------------------------------------------
 
-	public void attachJobGraph(List<AbstractJobVertex> topologiallySorted) throws JobException {
+	public void attachJobGraph(List<JobVertex> topologiallySorted) throws JobException {
 		if (LOG.isDebugEnabled()) {
 			LOG.debug(String.format("Attaching %d topologically sorted vertices to existing job graph with %d "
 					+ "vertices and %d intermediate results.", topologiallySorted.size(), tasks.size(), intermediateResults.size()));
@@ -463,7 +463,7 @@ public class ExecutionGraph implements Serializable {
 
 		final long createTimestamp = System.currentTimeMillis();
 
-		for (AbstractJobVertex jobVertex : topologiallySorted) {
+		for (JobVertex jobVertex : topologiallySorted) {
 
 			// create the execution job vertex and attach it to the graph
 			ExecutionJobVertex ejv = new ExecutionJobVertex(this, jobVertex, 1, timeout, createTimestamp);

http://git-wip-us.apache.org/repos/asf/flink/blob/1bd0af73/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index e53bc10..fcc0e9b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -25,7 +25,7 @@ import org.apache.flink.core.io.InputSplitSource;
 import org.apache.flink.core.io.LocatableInputSplit;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobEdge;
@@ -57,7 +57,7 @@ public class ExecutionJobVertex implements Serializable {
 	
 	private final ExecutionGraph graph;
 	
-	private final AbstractJobVertex jobVertex;
+	private final JobVertex jobVertex;
 	
 	private final ExecutionVertex[] taskVertices;
 
@@ -81,12 +81,12 @@ public class ExecutionJobVertex implements Serializable {
 	
 	private InputSplitAssigner splitAssigner;
 	
-	public ExecutionJobVertex(ExecutionGraph graph, AbstractJobVertex jobVertex,
+	public ExecutionJobVertex(ExecutionGraph graph, JobVertex jobVertex,
 							int defaultParallelism, FiniteDuration timeout) throws JobException {
 		this(graph, jobVertex, defaultParallelism, timeout, System.currentTimeMillis());
 	}
 	
-	public ExecutionJobVertex(ExecutionGraph graph, AbstractJobVertex jobVertex,
+	public ExecutionJobVertex(ExecutionGraph graph, JobVertex jobVertex,
 							int defaultParallelism, FiniteDuration timeout, long createTimestamp)
 			throws JobException
 	{
@@ -169,7 +169,7 @@ public class ExecutionJobVertex implements Serializable {
 		return graph;
 	}
 	
-	public AbstractJobVertex getJobVertex() {
+	public JobVertex getJobVertex() {
 		return jobVertex;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1bd0af73/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java
deleted file mode 100644
index 63968b0..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java
+++ /dev/null
@@ -1,405 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobgraph;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.InputSplitSource;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
-import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-
-import com.google.common.base.Preconditions;
-
-/**
- * An abstract base class for a job vertex.
- */
-public class AbstractJobVertex implements java.io.Serializable {
-
-	private static final long serialVersionUID = 1L;
-
-	private static final String DEFAULT_NAME = "(unnamed vertex)";
-	
-	
-	// --------------------------------------------------------------------------------------------
-	// Members that define the structure / topology of the graph
-	// --------------------------------------------------------------------------------------------
-
-	/** The ID of the vertex. */
-	private final JobVertexID id;
-
-	/** List of produced data sets, one per writer */
-	private final ArrayList<IntermediateDataSet> results = new ArrayList<IntermediateDataSet>();
-
-	/** List of edges with incoming data. One per Reader. */
-	private final ArrayList<JobEdge> inputs = new ArrayList<JobEdge>();
-
-	/** Number of subtasks to split this task into at runtime.*/
-	private int parallelism = -1;
-
-	/** Custom configuration passed to the assigned task at runtime. */
-	private Configuration configuration;
-
-	/** The class of the invokable. */
-	private String invokableClassName;
-
-	/** Optionally, a source of input splits */
-	private InputSplitSource<?> inputSplitSource;
-	
-	/** The name of the vertex */
-	private String name;
-	
-	/** Optionally, a sharing group that allows subtasks from different job vertices to run concurrently in one slot */
-	private SlotSharingGroup slotSharingGroup;
-	
-	/** The group inside which the vertex subtasks share slots */
-	private CoLocationGroup coLocationGroup;
-	
-	// --------------------------------------------------------------------------------------------
-
-	/**
-	 * Constructs a new job vertex and assigns it with the given name.
-	 * 
-	 * @param name The name of the new job vertex.
-	 */
-	public AbstractJobVertex(String name) {
-		this(name, null);
-	}
-	
-	/**
-	 * Constructs a new job vertex and assigns it with the given name.
-	 * 
-	 * @param name The name of the new job vertex.
-	 * @param id The id of the job vertex.
-	 */
-	public AbstractJobVertex(String name, JobVertexID id) {
-		this.name = name == null ? DEFAULT_NAME : name;
-		this.id = id == null ? new JobVertexID() : id;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * Returns the ID of this job vertex.
-	 * 
-	 * @return The ID of this job vertex
-	 */
-	public JobVertexID getID() {
-		return this.id;
-	}
-	
-	/**
-	 * Returns the name of the vertex.
-	 * 
-	 * @return The name of the vertex.
-	 */
-	public String getName() {
-		return this.name;
-	}
-	
-	/**
-	 * Sets the name of the vertex
-	 * 
-	 * @param name The new name.
-	 */
-	public void setName(String name) {
-		this.name = name == null ? DEFAULT_NAME : name;
-	}
-
-	/**
-	 * Returns the number of produced intermediate data sets.
-	 * 
-	 * @return The number of produced intermediate data sets.
-	 */
-	public int getNumberOfProducedIntermediateDataSets() {
-		return this.results.size();
-	}
-
-	/**
-	 * Returns the number of inputs.
-	 * 
-	 * @return The number of inputs.
-	 */
-	public int getNumberOfInputs() {
-		return this.inputs.size();
-	}
-
-	/**
-	 * Returns the vertex's configuration object which can be used to pass custom settings to the task at runtime.
-	 * 
-	 * @return the vertex's configuration object
-	 */
-	public Configuration getConfiguration() {
-		if (this.configuration == null) {
-			this.configuration = new Configuration();
-		}
-		return this.configuration;
-	}
-	
-	public void setInvokableClass(Class<? extends AbstractInvokable> invokable) {
-		Preconditions.checkNotNull(invokable);
-		this.invokableClassName = invokable.getName();
-	}
-	
-	/**
-	 * Returns the name of the invokable class which represents the task of this vertex.
-	 * 
-	 * @return The name of the invokable class, <code>null</code> if not set.
-	 */
-	public String getInvokableClassName() {
-		return this.invokableClassName;
-	}
-	
-	/**
-	 * Returns the invokable class which represents the task of this vertex
-	 * 
-	 * @param cl The classloader used to resolve user-defined classes
-	 * @return The invokable class, <code>null</code> if it is not set
-	 */
-	public Class<? extends AbstractInvokable> getInvokableClass(ClassLoader cl) {
-		if (cl == null) {
-			throw new NullPointerException("The classloader must not be null.");
-		}
-		if (invokableClassName == null) {
-			return null;
-		}
-		
-		try {
-			return Class.forName(invokableClassName, true, cl).asSubclass(AbstractInvokable.class);
-		}
-		catch (ClassNotFoundException e) {
-			throw new RuntimeException("The user-code class could not be resolved.", e);
-		}
-		catch (ClassCastException e) {
-			throw new RuntimeException("The user-code class is no subclass of " + AbstractInvokable.class.getName(), e);
-		}
-	}
-	
-	/**
-	 * Gets the parallelism of the task.
-	 * 
-	 * @return The parallelism of the task.
-	 */
-	public int getParallelism() {
-		return parallelism;
-	}
-
-	/**
-	 * Sets the parallelism for the task.
-	 * 
-	 * @param parallelism The parallelism for the task.
-	 */
-	public void setParallelism(int parallelism) {
-		if (parallelism < 1) {
-			throw new IllegalArgumentException("The parallelism must be at least one.");
-		}
-		this.parallelism = parallelism;
-	}
-	
-	public InputSplitSource<?> getInputSplitSource() {
-		return inputSplitSource;
-	}
-
-	public void setInputSplitSource(InputSplitSource<?> inputSplitSource) {
-		this.inputSplitSource = inputSplitSource;
-	}
-	
-	public List<IntermediateDataSet> getProducedDataSets() {
-		return this.results;
-	}
-	
-	public List<JobEdge> getInputs() {
-		return this.inputs;
-	}
-	
-	/**
-	 * Associates this vertex with a slot sharing group for scheduling. Different vertices in the same
-	 * slot sharing group can run one subtask each in the same slot.
-	 * 
-	 * @param grp The slot sharing group to associate the vertex with.
-	 */
-	public void setSlotSharingGroup(SlotSharingGroup grp) {
-		if (this.slotSharingGroup != null) {
-			this.slotSharingGroup.removeVertexFromGroup(id);
-		}
-		
-		this.slotSharingGroup = grp;
-		if (grp != null) {
-			grp.addVertexToGroup(id);
-		}
-	}
-	
-	/**
-	 * Gets the slot sharing group that this vertex is associated with. Different vertices in the same
-	 * slot sharing group can run one subtask each in the same slot. If the vertex is not associated with
-	 * a slot sharing group, this method returns {@code null}.
-	 * 
-	 * @return The slot sharing group to associate the vertex with, or {@code null}, if not associated with one.
-	 */
-	public SlotSharingGroup getSlotSharingGroup() {
-		return slotSharingGroup;
-	}
-	
-	/**
-	 * Tells this vertex to strictly co locate its subtasks with the subtasks of the given vertex.
-	 * Strict co-location implies that the n'th subtask of this vertex will run on the same parallel computing
-	 * instance (TaskManager) as the n'th subtask of the given vertex.
-	 * 
-	 * NOTE: Co-location is only possible between vertices in a slot sharing group.
-	 * 
-	 * NOTE: This vertex must (transitively) depend on the vertex to be co-located with. That means that the
-	 * respective vertex must be a (transitive) input of this vertex.
-	 * 
-	 * @param strictlyCoLocatedWith The vertex whose subtasks to co-locate this vertex's subtasks with.
-	 * 
-	 * @throws IllegalArgumentException Thrown, if this vertex and the vertex to co-locate with are not in a common
-	 *                                  slot sharing group.
-	 * 
-	 * @see #setSlotSharingGroup(SlotSharingGroup)
-	 */
-	public void setStrictlyCoLocatedWith(AbstractJobVertex strictlyCoLocatedWith) {
-		if (this.slotSharingGroup == null || this.slotSharingGroup != strictlyCoLocatedWith.slotSharingGroup) {
-			throw new IllegalArgumentException("Strict co-location requires that both vertices are in the same slot sharing group.");
-		}
-		
-		CoLocationGroup thisGroup = this.coLocationGroup;
-		CoLocationGroup otherGroup = strictlyCoLocatedWith.coLocationGroup;
-		
-		if (otherGroup == null) {
-			if (thisGroup == null) {
-				CoLocationGroup group = new CoLocationGroup(this, strictlyCoLocatedWith);
-				this.coLocationGroup = group;
-				strictlyCoLocatedWith.coLocationGroup = group;
-			}
-			else {
-				thisGroup.addVertex(strictlyCoLocatedWith);
-				strictlyCoLocatedWith.coLocationGroup = thisGroup;
-			}
-		}
-		else {
-			if (thisGroup == null) {
-				otherGroup.addVertex(this);
-				this.coLocationGroup = otherGroup;
-			}
-			else {
-				// both had yet distinct groups, we need to merge them
-				thisGroup.mergeInto(otherGroup);
-			}
-		}
-	}
-	
-	public CoLocationGroup getCoLocationGroup() {
-		return coLocationGroup;
-	}
-	
-	public void updateCoLocationGroup(CoLocationGroup group) {
-		this.coLocationGroup = group;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-
-	public IntermediateDataSet createAndAddResultDataSet(ResultPartitionType partitionType) {
-		return createAndAddResultDataSet(new IntermediateDataSetID(), partitionType);
-	}
-
-	public IntermediateDataSet createAndAddResultDataSet(
-			IntermediateDataSetID id,
-			ResultPartitionType partitionType) {
-
-		IntermediateDataSet result = new IntermediateDataSet(id, partitionType, this);
-		this.results.add(result);
-		return result;
-	}
-
-	public void connectDataSetAsInput(IntermediateDataSet dataSet, DistributionPattern distPattern) {
-		JobEdge edge = new JobEdge(dataSet, this, distPattern);
-		this.inputs.add(edge);
-		dataSet.addConsumer(edge);
-	}
-
-	public void connectNewDataSetAsInput(AbstractJobVertex input, DistributionPattern distPattern) {
-		connectNewDataSetAsInput(input, distPattern, ResultPartitionType.PIPELINED);
-	}
-
-	public void connectNewDataSetAsInput(
-			AbstractJobVertex input,
-			DistributionPattern distPattern,
-			ResultPartitionType partitionType) {
-
-		IntermediateDataSet dataSet = input.createAndAddResultDataSet(partitionType);
-		JobEdge edge = new JobEdge(dataSet, this, distPattern);
-		this.inputs.add(edge);
-		dataSet.addConsumer(edge);
-	}
-
-	public void connectIdInput(IntermediateDataSetID dataSetId, DistributionPattern distPattern) {
-		JobEdge edge = new JobEdge(dataSetId, this, distPattern);
-		this.inputs.add(edge);
-	}
-
-	// --------------------------------------------------------------------------------------------
-	
-	public boolean isInputVertex() {
-		return this.inputs.isEmpty();
-	}
-	
-	public boolean isOutputVertex() {
-		return this.results.isEmpty();
-	}
-	
-	public boolean hasNoConnectedInputs() {
-		for (JobEdge edge : inputs) {
-			if (!edge.isIdReference()) {
-				return false;
-			}
-		}
-		
-		return true;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * A hook that can be overwritten by sub classes to implement logic that is called by the 
-	 * master when the job starts.
-	 * 
-	 * @param loader The class loader for user defined code.
-	 * @throws Exception The method may throw exceptions which cause the job to fail immediately.
-	 */
-	public void initializeOnMaster(ClassLoader loader) throws Exception {}
-	
-	/**
-	 * A hook that can be overwritten by sub classes to implement logic that is called by the 
-	 * master after the job completed.
-	 * 
-	 * @param loader The class loader for user defined code.
-	 * @throws Exception The method may throw exceptions which cause the job to fail immediately.
-	 */
-	public void finalizeOnMaster(ClassLoader loader) throws Exception {}
-	
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public String toString() {
-		return this.name + " (" + this.invokableClassName + ')';
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/1bd0af73/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java
index 0ea0da7..011850c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.operators.util.UserCodeWrapper;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 
-public class InputFormatVertex extends AbstractJobVertex {
+public class InputFormatVertex extends JobVertex {
 
 	private static final long serialVersionUID = 1L;
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/1bd0af73/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java
index 86888e2..7c8f32b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java
@@ -39,7 +39,7 @@ public class IntermediateDataSet implements java.io.Serializable {
 	
 	private final IntermediateDataSetID id; 		// the identifier
 	
-	private final AbstractJobVertex producer;		// the operation that produced this data set
+	private final JobVertex producer;			// the operation that produced this data set
 	
 	private final List<JobEdge> consumers = new ArrayList<JobEdge>();
 
@@ -48,15 +48,15 @@ public class IntermediateDataSet implements java.io.Serializable {
 	
 	// --------------------------------------------------------------------------------------------
 	
-	public IntermediateDataSet(AbstractJobVertex producer) {
+	public IntermediateDataSet(JobVertex producer) {
 		this(new IntermediateDataSetID(), producer);
 	}
 	
-	public IntermediateDataSet(IntermediateDataSetID id, AbstractJobVertex producer) {
+	public IntermediateDataSet(IntermediateDataSetID id, JobVertex producer) {
 		this(id, ResultPartitionType.PIPELINED, producer);
 	}
 
-	public IntermediateDataSet(IntermediateDataSetID id, ResultPartitionType resultType, AbstractJobVertex producer) {
+	public IntermediateDataSet(IntermediateDataSetID id, ResultPartitionType resultType, JobVertex producer) {
 		this.id = checkNotNull(id);
 		this.producer = checkNotNull(producer);
 		this.resultType = checkNotNull(resultType);
@@ -68,7 +68,7 @@ public class IntermediateDataSet implements java.io.Serializable {
 		return id;
 	}
 
-	public AbstractJobVertex getProducer() {
+	public JobVertex getProducer() {
 		return producer;
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/1bd0af73/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobEdge.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobEdge.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobEdge.java
index 939f6c4..5faea83 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobEdge.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobEdge.java
@@ -29,7 +29,7 @@ public class JobEdge implements java.io.Serializable {
 	
 	
 	/** The vertex connected to this edge. */
-	private final AbstractJobVertex target;
+	private final JobVertex target;
 
 	/** The distribution pattern that should be used for this job edge. */
 	private final DistributionPattern distributionPattern;
@@ -47,7 +47,7 @@ public class JobEdge implements java.io.Serializable {
 	 * @param target The operation that is at the target of this edge.
 	 * @param distributionPattern The pattern that defines how the connection behaves in parallel.
 	 */
-	public JobEdge(IntermediateDataSet source, AbstractJobVertex target, DistributionPattern distributionPattern) {
+	public JobEdge(IntermediateDataSet source, JobVertex target, DistributionPattern distributionPattern) {
 		if (source == null || target == null || distributionPattern == null) {
 			throw new NullPointerException();
 		}
@@ -65,7 +65,7 @@ public class JobEdge implements java.io.Serializable {
 	 * @param target The operation that is at the target of this edge.
 	 * @param distributionPattern The pattern that defines how the connection behaves in parallel.
 	 */
-	public JobEdge(IntermediateDataSetID sourceId, AbstractJobVertex target, DistributionPattern distributionPattern) {
+	public JobEdge(IntermediateDataSetID sourceId, JobVertex target, DistributionPattern distributionPattern) {
 		if (sourceId == null || target == null || distributionPattern == null) {
 			throw new NullPointerException();
 		}
@@ -90,7 +90,7 @@ public class JobEdge implements java.io.Serializable {
 	 * 
 	 * @return The vertex connected to this edge.
 	 */
-	public AbstractJobVertex getTarget() {
+	public JobVertex getTarget() {
 		return target;
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/1bd0af73/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index 28fa78e..09b415b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -60,7 +60,7 @@ public class JobGraph implements Serializable {
 	// --------------------------------------------------------------------------------------------
 
 	/** List of task vertices included in this job graph. */
-	private final Map<JobVertexID, AbstractJobVertex> taskVertices = new LinkedHashMap<JobVertexID, AbstractJobVertex>();
+	private final Map<JobVertexID, JobVertex> taskVertices = new LinkedHashMap<JobVertexID, JobVertex>();
 
 	/** The job configuration attached to this job. */
 	private final Configuration jobConfiguration = new Configuration();
@@ -124,7 +124,7 @@ public class JobGraph implements Serializable {
 	 * 
 	 * @param vertices The vertices to add to the graph.
 	 */
-	public JobGraph(AbstractJobVertex... vertices) {
+	public JobGraph(JobVertex... vertices) {
 		this(null, vertices);
 	}
 
@@ -134,7 +134,7 @@ public class JobGraph implements Serializable {
 	 * @param jobName The name of the job.
 	 * @param vertices The vertices to add to the graph.
 	 */
-	public JobGraph(String jobName, AbstractJobVertex... vertices) {
+	public JobGraph(String jobName, JobVertex... vertices) {
 		this(null, jobName, vertices);
 	}
 	
@@ -145,10 +145,10 @@ public class JobGraph implements Serializable {
 	 * @param jobName The name of the job.
 	 * @param vertices The vertices to add to the graph.
 	 */
-	public JobGraph(JobID jobId, String jobName, AbstractJobVertex... vertices) {
+	public JobGraph(JobID jobId, String jobName, JobVertex... vertices) {
 		this(jobId, jobName);
 		
-		for (AbstractJobVertex vertex : vertices) {
+		for (JobVertex vertex : vertices) {
 			addVertex(vertex);
 		}
 	}
@@ -229,9 +229,9 @@ public class JobGraph implements Serializable {
 	 * @param vertex
 	 *        the new task vertex to be added
 	 */
-	public void addVertex(AbstractJobVertex vertex) {
+	public void addVertex(JobVertex vertex) {
 		final JobVertexID id = vertex.getID();
-		AbstractJobVertex previous = taskVertices.put(id, vertex);
+		JobVertex previous = taskVertices.put(id, vertex);
 		
 		// if we had a prior association, restore and throw an exception
 		if (previous != null) {
@@ -245,7 +245,7 @@ public class JobGraph implements Serializable {
 	 * 
 	 * @return an Iterable to iterate all vertices registered with the job graph
 	 */
-	public Iterable<AbstractJobVertex> getVertices() {
+	public Iterable<JobVertex> getVertices() {
 		return this.taskVertices.values();
 	}
 	
@@ -255,8 +255,8 @@ public class JobGraph implements Serializable {
 	 * 
 	 * @return an array of all job vertices that are registered with the job graph
 	 */
-	public AbstractJobVertex[] getVerticesAsArray() {
-		return this.taskVertices.values().toArray(new AbstractJobVertex[this.taskVertices.size()]);
+	public JobVertex[] getVerticesAsArray() {
+		return this.taskVertices.values().toArray(new JobVertex[this.taskVertices.size()]);
 	}
 
 	/**
@@ -295,27 +295,27 @@ public class JobGraph implements Serializable {
 	 *        the ID of the vertex to search for
 	 * @return the vertex with the matching ID or <code>null</code> if no vertex with such ID could be found
 	 */
-	public AbstractJobVertex findVertexByID(JobVertexID id) {
+	public JobVertex findVertexByID(JobVertexID id) {
 		return this.taskVertices.get(id);
 	}
 	
 	// --------------------------------------------------------------------------------------------
 
-	public List<AbstractJobVertex> getVerticesSortedTopologicallyFromSources() throws InvalidProgramException {
+	public List<JobVertex> getVerticesSortedTopologicallyFromSources() throws InvalidProgramException {
 		// early out on empty lists
 		if (this.taskVertices.isEmpty()) {
 			return Collections.emptyList();
 		}
 		
-		List<AbstractJobVertex> sorted = new ArrayList<AbstractJobVertex>(this.taskVertices.size());
-		Set<AbstractJobVertex> remaining = new LinkedHashSet<AbstractJobVertex>(this.taskVertices.values());
+		List<JobVertex> sorted = new ArrayList<JobVertex>(this.taskVertices.size());
+		Set<JobVertex> remaining = new LinkedHashSet<JobVertex>(this.taskVertices.values());
 		
 		// start by finding the vertices with no input edges
 		// and the ones with disconnected inputs (that refer to some standalone data set)
 		{
-			Iterator<AbstractJobVertex> iter = remaining.iterator();
+			Iterator<JobVertex> iter = remaining.iterator();
 			while (iter.hasNext()) {
-				AbstractJobVertex vertex = iter.next();
+				JobVertex vertex = iter.next();
 				
 				if (vertex.hasNoConnectedInputs()) {
 					sorted.add(vertex);
@@ -335,21 +335,21 @@ public class JobGraph implements Serializable {
 				throw new InvalidProgramException("The job graph is cyclic.");
 			}
 			
-			AbstractJobVertex current = sorted.get(startNodePos++);
+			JobVertex current = sorted.get(startNodePos++);
 			addNodesThatHaveNoNewPredecessors(current, sorted, remaining);
 		}
 		
 		return sorted;
 	}
 	
-	private void addNodesThatHaveNoNewPredecessors(AbstractJobVertex start, List<AbstractJobVertex> target, Set<AbstractJobVertex> remaining) {
+	private void addNodesThatHaveNoNewPredecessors(JobVertex start, List<JobVertex> target, Set<JobVertex> remaining) {
 		
 		// forward traverse over all produced data sets and all their consumers
 		for (IntermediateDataSet dataSet : start.getProducedDataSets()) {
 			for (JobEdge edge : dataSet.getConsumers()) {
 				
 				// a vertex can be added, if it has no predecessors that are still in the 'remaining' set
-				AbstractJobVertex v = edge.getTarget();
+				JobVertex v = edge.getTarget();
 				if (!remaining.contains(v)) {
 					continue;
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/1bd0af73/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
new file mode 100644
index 0000000..4bf9fc4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobgraph;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputSplitSource;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * The base class for job vertexes.
+ */
+public class JobVertex implements java.io.Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	private static final String DEFAULT_NAME = "(unnamed vertex)";
+	
+	
+	// --------------------------------------------------------------------------------------------
+	// Members that define the structure / topology of the graph
+	// --------------------------------------------------------------------------------------------
+
+	/** The ID of the vertex. */
+	private final JobVertexID id;
+
+	/** List of produced data sets, one per writer */
+	private final ArrayList<IntermediateDataSet> results = new ArrayList<IntermediateDataSet>();
+
+	/** List of edges with incoming data. One per Reader. */
+	private final ArrayList<JobEdge> inputs = new ArrayList<JobEdge>();
+
+	/** Number of subtasks to split this task into at runtime.*/
+	private int parallelism = -1;
+
+	/** Custom configuration passed to the assigned task at runtime. */
+	private Configuration configuration;
+
+	/** The class of the invokable. */
+	private String invokableClassName;
+
+	/** Optionally, a source of input splits */
+	private InputSplitSource<?> inputSplitSource;
+	
+	/** The name of the vertex */
+	private String name;
+	
+	/** Optionally, a sharing group that allows subtasks from different job vertices to run concurrently in one slot */
+	private SlotSharingGroup slotSharingGroup;
+	
+	/** The group inside which the vertex subtasks share slots */
+	private CoLocationGroup coLocationGroup;
+	
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Constructs a new job vertex and assigns it with the given name.
+	 * 
+	 * @param name The name of the new job vertex.
+	 */
+	public JobVertex(String name) {
+		this(name, null);
+	}
+	
+	/**
+	 * Constructs a new job vertex and assigns it with the given name.
+	 * 
+	 * @param name The name of the new job vertex.
+	 * @param id The id of the job vertex.
+	 */
+	public JobVertex(String name, JobVertexID id) {
+		this.name = name == null ? DEFAULT_NAME : name;
+		this.id = id == null ? new JobVertexID() : id;
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	/**
+	 * Returns the ID of this job vertex.
+	 * 
+	 * @return The ID of this job vertex
+	 */
+	public JobVertexID getID() {
+		return this.id;
+	}
+	
+	/**
+	 * Returns the name of the vertex.
+	 * 
+	 * @return The name of the vertex.
+	 */
+	public String getName() {
+		return this.name;
+	}
+	
+	/**
+	 * Sets the name of the vertex
+	 * 
+	 * @param name The new name.
+	 */
+	public void setName(String name) {
+		this.name = name == null ? DEFAULT_NAME : name;
+	}
+
+	/**
+	 * Returns the number of produced intermediate data sets.
+	 * 
+	 * @return The number of produced intermediate data sets.
+	 */
+	public int getNumberOfProducedIntermediateDataSets() {
+		return this.results.size();
+	}
+
+	/**
+	 * Returns the number of inputs.
+	 * 
+	 * @return The number of inputs.
+	 */
+	public int getNumberOfInputs() {
+		return this.inputs.size();
+	}
+
+	/**
+	 * Returns the vertex's configuration object which can be used to pass custom settings to the task at runtime.
+	 * 
+	 * @return the vertex's configuration object
+	 */
+	public Configuration getConfiguration() {
+		if (this.configuration == null) {
+			this.configuration = new Configuration();
+		}
+		return this.configuration;
+	}
+	
+	public void setInvokableClass(Class<? extends AbstractInvokable> invokable) {
+		Preconditions.checkNotNull(invokable);
+		this.invokableClassName = invokable.getName();
+	}
+	
+	/**
+	 * Returns the name of the invokable class which represents the task of this vertex.
+	 * 
+	 * @return The name of the invokable class, <code>null</code> if not set.
+	 */
+	public String getInvokableClassName() {
+		return this.invokableClassName;
+	}
+	
+	/**
+	 * Returns the invokable class which represents the task of this vertex
+	 * 
+	 * @param cl The classloader used to resolve user-defined classes
+	 * @return The invokable class, <code>null</code> if it is not set
+	 */
+	public Class<? extends AbstractInvokable> getInvokableClass(ClassLoader cl) {
+		if (cl == null) {
+			throw new NullPointerException("The classloader must not be null.");
+		}
+		if (invokableClassName == null) {
+			return null;
+		}
+		
+		try {
+			return Class.forName(invokableClassName, true, cl).asSubclass(AbstractInvokable.class);
+		}
+		catch (ClassNotFoundException e) {
+			throw new RuntimeException("The user-code class could not be resolved.", e);
+		}
+		catch (ClassCastException e) {
+			throw new RuntimeException("The user-code class is no subclass of " + AbstractInvokable.class.getName(), e);
+		}
+	}
+	
+	/**
+	 * Gets the parallelism of the task.
+	 * 
+	 * @return The parallelism of the task.
+	 */
+	public int getParallelism() {
+		return parallelism;
+	}
+
+	/**
+	 * Sets the parallelism for the task.
+	 * 
+	 * @param parallelism The parallelism for the task.
+	 */
+	public void setParallelism(int parallelism) {
+		if (parallelism < 1) {
+			throw new IllegalArgumentException("The parallelism must be at least one.");
+		}
+		this.parallelism = parallelism;
+	}
+	
+	public InputSplitSource<?> getInputSplitSource() {
+		return inputSplitSource;
+	}
+
+	public void setInputSplitSource(InputSplitSource<?> inputSplitSource) {
+		this.inputSplitSource = inputSplitSource;
+	}
+	
+	public List<IntermediateDataSet> getProducedDataSets() {
+		return this.results;
+	}
+	
+	public List<JobEdge> getInputs() {
+		return this.inputs;
+	}
+	
+	/**
+	 * Associates this vertex with a slot sharing group for scheduling. Different vertices in the same
+	 * slot sharing group can run one subtask each in the same slot.
+	 * 
+	 * @param grp The slot sharing group to associate the vertex with.
+	 */
+	public void setSlotSharingGroup(SlotSharingGroup grp) {
+		if (this.slotSharingGroup != null) {
+			this.slotSharingGroup.removeVertexFromGroup(id);
+		}
+		
+		this.slotSharingGroup = grp;
+		if (grp != null) {
+			grp.addVertexToGroup(id);
+		}
+	}
+	
+	/**
+	 * Gets the slot sharing group that this vertex is associated with. Different vertices in the same
+	 * slot sharing group can run one subtask each in the same slot. If the vertex is not associated with
+	 * a slot sharing group, this method returns {@code null}.
+	 * 
+	 * @return The slot sharing group to associate the vertex with, or {@code null}, if not associated with one.
+	 */
+	public SlotSharingGroup getSlotSharingGroup() {
+		return slotSharingGroup;
+	}
+	
+	/**
+	 * Tells this vertex to strictly co locate its subtasks with the subtasks of the given vertex.
+	 * Strict co-location implies that the n'th subtask of this vertex will run on the same parallel computing
+	 * instance (TaskManager) as the n'th subtask of the given vertex.
+	 * 
+	 * NOTE: Co-location is only possible between vertices in a slot sharing group.
+	 * 
+	 * NOTE: This vertex must (transitively) depend on the vertex to be co-located with. That means that the
+	 * respective vertex must be a (transitive) input of this vertex.
+	 * 
+	 * @param strictlyCoLocatedWith The vertex whose subtasks to co-locate this vertex's subtasks with.
+	 * 
+	 * @throws IllegalArgumentException Thrown, if this vertex and the vertex to co-locate with are not in a common
+	 *                                  slot sharing group.
+	 * 
+	 * @see #setSlotSharingGroup(SlotSharingGroup)
+	 */
+	public void setStrictlyCoLocatedWith(JobVertex strictlyCoLocatedWith) {
+		if (this.slotSharingGroup == null || this.slotSharingGroup != strictlyCoLocatedWith.slotSharingGroup) {
+			throw new IllegalArgumentException("Strict co-location requires that both vertices are in the same slot sharing group.");
+		}
+		
+		CoLocationGroup thisGroup = this.coLocationGroup;
+		CoLocationGroup otherGroup = strictlyCoLocatedWith.coLocationGroup;
+		
+		if (otherGroup == null) {
+			if (thisGroup == null) {
+				CoLocationGroup group = new CoLocationGroup(this, strictlyCoLocatedWith);
+				this.coLocationGroup = group;
+				strictlyCoLocatedWith.coLocationGroup = group;
+			}
+			else {
+				thisGroup.addVertex(strictlyCoLocatedWith);
+				strictlyCoLocatedWith.coLocationGroup = thisGroup;
+			}
+		}
+		else {
+			if (thisGroup == null) {
+				otherGroup.addVertex(this);
+				this.coLocationGroup = otherGroup;
+			}
+			else {
+				// both had yet distinct groups, we need to merge them
+				thisGroup.mergeInto(otherGroup);
+			}
+		}
+	}
+	
+	public CoLocationGroup getCoLocationGroup() {
+		return coLocationGroup;
+	}
+	
+	public void updateCoLocationGroup(CoLocationGroup group) {
+		this.coLocationGroup = group;
+	}
+	
+	// --------------------------------------------------------------------------------------------
+
+	public IntermediateDataSet createAndAddResultDataSet(ResultPartitionType partitionType) {
+		return createAndAddResultDataSet(new IntermediateDataSetID(), partitionType);
+	}
+
+	public IntermediateDataSet createAndAddResultDataSet(
+			IntermediateDataSetID id,
+			ResultPartitionType partitionType) {
+
+		IntermediateDataSet result = new IntermediateDataSet(id, partitionType, this);
+		this.results.add(result);
+		return result;
+	}
+
+	public void connectDataSetAsInput(IntermediateDataSet dataSet, DistributionPattern distPattern) {
+		JobEdge edge = new JobEdge(dataSet, this, distPattern);
+		this.inputs.add(edge);
+		dataSet.addConsumer(edge);
+	}
+
+	public void connectNewDataSetAsInput(JobVertex input, DistributionPattern distPattern) {
+		connectNewDataSetAsInput(input, distPattern, ResultPartitionType.PIPELINED);
+	}
+
+	public void connectNewDataSetAsInput(
+			JobVertex input,
+			DistributionPattern distPattern,
+			ResultPartitionType partitionType) {
+
+		IntermediateDataSet dataSet = input.createAndAddResultDataSet(partitionType);
+		JobEdge edge = new JobEdge(dataSet, this, distPattern);
+		this.inputs.add(edge);
+		dataSet.addConsumer(edge);
+	}
+
+	public void connectIdInput(IntermediateDataSetID dataSetId, DistributionPattern distPattern) {
+		JobEdge edge = new JobEdge(dataSetId, this, distPattern);
+		this.inputs.add(edge);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	
+	public boolean isInputVertex() {
+		return this.inputs.isEmpty();
+	}
+	
+	public boolean isOutputVertex() {
+		return this.results.isEmpty();
+	}
+	
+	public boolean hasNoConnectedInputs() {
+		for (JobEdge edge : inputs) {
+			if (!edge.isIdReference()) {
+				return false;
+			}
+		}
+		
+		return true;
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	/**
+	 * A hook that can be overwritten by sub classes to implement logic that is called by the 
+	 * master when the job starts.
+	 * 
+	 * @param loader The class loader for user defined code.
+	 * @throws Exception The method may throw exceptions which cause the job to fail immediately.
+	 */
+	public void initializeOnMaster(ClassLoader loader) throws Exception {}
+	
+	/**
+	 * A hook that can be overwritten by sub classes to implement logic that is called by the 
+	 * master after the job completed.
+	 * 
+	 * @param loader The class loader for user defined code.
+	 * @throws Exception The method may throw exceptions which cause the job to fail immediately.
+	 */
+	public void finalizeOnMaster(ClassLoader loader) throws Exception {}
+	
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public String toString() {
+		return this.name + " (" + this.invokableClassName + ')';
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1bd0af73/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatVertex.java
index 2a1f89c..c9ac564 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatVertex.java
@@ -28,7 +28,7 @@ import org.apache.flink.runtime.operators.util.TaskConfig;
  * A task vertex that run an initialization on the master, trying to deserialize an output format
  * and initializing it on master, if necessary.
  */
-public class OutputFormatVertex extends AbstractJobVertex {
+public class OutputFormatVertex extends JobVertex {
 	
 	private static final long serialVersionUID = 1L;
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/1bd0af73/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
index a70cd2b..85dd5c5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
@@ -108,9 +108,9 @@ public abstract class AbstractInvokable {
 	}
 
 	/**
-	 * Returns the task configuration object which was attached to the original {@link org.apache.flink.runtime.jobgraph.AbstractJobVertex}.
+	 * Returns the task configuration object which was attached to the original {@link org.apache.flink.runtime.jobgraph.JobVertex}.
 	 * 
-	 * @return the task configuration object which was attached to the original {@link org.apache.flink.runtime.jobgraph.AbstractJobVertex}
+	 * @return the task configuration object which was attached to the original {@link org.apache.flink.runtime.jobgraph.JobVertex}
 	 */
 	public Configuration getTaskConfiguration() {
 		return this.environment.getTaskConfiguration();

http://git-wip-us.apache.org/repos/asf/flink/blob/1bd0af73/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroup.java
index a88c89d..74e34e1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroup.java
@@ -22,7 +22,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.flink.util.AbstractID;
-import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertex;
 
 import com.google.common.base.Preconditions;
 
@@ -43,7 +43,7 @@ public class CoLocationGroup implements java.io.Serializable {
 	private final AbstractID id = new AbstractID();
 	
 	/** The vertices participating in the co-location group */
-	private final List<AbstractJobVertex> vertices = new ArrayList<AbstractJobVertex>();
+	private final List<JobVertex> vertices = new ArrayList<JobVertex>();
 	
 	/** The constraints, which hold the shared slots for the co-located operators */
 	private transient ArrayList<CoLocationConstraint> constraints;
@@ -52,15 +52,15 @@ public class CoLocationGroup implements java.io.Serializable {
 	
 	public CoLocationGroup() {}
 	
-	public CoLocationGroup(AbstractJobVertex... vertices) {
-		for (AbstractJobVertex v : vertices) {
+	public CoLocationGroup(JobVertex... vertices) {
+		for (JobVertex v : vertices) {
 			this.vertices.add(v);
 		}
 	}
 	
 	// --------------------------------------------------------------------------------------------
 	
-	public void addVertex(AbstractJobVertex vertex) {
+	public void addVertex(JobVertex vertex) {
 		Preconditions.checkNotNull(vertex);
 		this.vertices.add(vertex);
 	}
@@ -68,7 +68,7 @@ public class CoLocationGroup implements java.io.Serializable {
 	public void mergeInto(CoLocationGroup other) {
 		Preconditions.checkNotNull(other);
 		
-		for (AbstractJobVertex v : this.vertices) {
+		for (JobVertex v : this.vertices) {
 			v.updateCoLocationGroup(other);
 		}
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/1bd0af73/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
index 3d9a155..932e366 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
@@ -24,7 +24,7 @@ import akka.pattern.Patterns;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
@@ -55,7 +55,7 @@ public class CoordinatorShutdownTest {
 			cluster = new LocalFlinkMiniCluster(noTaskManagerConfig, true);
 			
 			// build a test graph with snapshotting enabled
-			AbstractJobVertex vertex = new AbstractJobVertex("Test Vertex");
+			JobVertex vertex = new JobVertex("Test Vertex");
 			vertex.setInvokableClass(Tasks.NoOpInvokable.class);
 			List<JobVertexID> vertexIdList = Collections.singletonList(vertex.getID());
 			
@@ -102,7 +102,7 @@ public class CoordinatorShutdownTest {
 			cluster = new LocalFlinkMiniCluster(new Configuration(), true);
 			
 			// build a test graph with snapshotting enabled
-			AbstractJobVertex vertex = new AbstractJobVertex("Test Vertex");
+			JobVertex vertex = new JobVertex("Test Vertex");
 			vertex.setInvokableClass(Tasks.NoOpInvokable.class);
 			List<JobVertexID> vertexIdList = Collections.singletonList(vertex.getID());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1bd0af73/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
index 4d10585..693e014 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.executiongraph;
 import java.util.Arrays;
 
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -32,10 +32,10 @@ public class AllVerticesIteratorTest {
 	public void testAllVertices() {
 		try {
 			
-			AbstractJobVertex v1 = new AbstractJobVertex("v1");
-			AbstractJobVertex v2 = new AbstractJobVertex("v2");
-			AbstractJobVertex v3 = new AbstractJobVertex("v3");
-			AbstractJobVertex v4 = new AbstractJobVertex("v4");
+			JobVertex v1 = new JobVertex("v1");
+			JobVertex v2 = new JobVertex("v2");
+			JobVertex v3 = new JobVertex("v3");
+			JobVertex v4 = new JobVertex("v4");
 			
 			v1.setParallelism(1);
 			v2.setParallelism(7);