You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/21 04:12:29 UTC

[05/63] [abbrv] Refactor job graph construction to incremental attachment based

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/event/job/JobEventTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/event/job/JobEventTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/event/job/JobEventTest.java
index 0698b56..c8ded86 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/event/job/JobEventTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/event/job/JobEventTest.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.event.job;
 
 import static org.junit.Assert.assertEquals;
@@ -25,18 +24,16 @@ import static org.junit.Assert.fail;
 
 import java.io.IOException;
 
-import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.junit.Test;
 
 /**
  * This class contains tests concerning the serialization/deserialization of job events which have been derived from
  * {@link org.apache.flink.runtime.event.job.AbstractEvent}.
- * 
  */
 public class JobEventTest {
+
 	/**
 	 * This test checks the correct serialization/deserialization of a {@link JobEvent}.
 	 */
@@ -57,31 +54,4 @@ public class JobEventTest {
 			fail(ioe.getMessage());
 		}
 	}
-
-	/**
-	 * This test checks the correct serialization/deserialization of a {@link VertexEvent}.
-	 */
-	@Test
-	public void testVertexEvent() {
-
-		try {
-
-			final VertexEvent orig = new VertexEvent(23423423L, new JobVertexID(), "Test Vertex", 2, 0,
-				ExecutionState.READY, "Test Description");
-			final VertexEvent copy = (VertexEvent) CommonTestUtils.createCopyWritable(orig);
-
-			assertEquals(orig.getTimestamp(), copy.getTimestamp());
-			assertEquals(orig.getJobVertexID(), copy.getJobVertexID());
-			assertEquals(orig.getJobVertexName(), copy.getJobVertexName());
-			assertEquals(orig.getTotalNumberOfSubtasks(), copy.getTotalNumberOfSubtasks());
-			assertEquals(orig.getIndexOfSubtask(), copy.getIndexOfSubtask());
-			assertEquals(orig.getCurrentExecutionState(), copy.getCurrentExecutionState());
-			assertEquals(orig.getDescription(), copy.getDescription());
-			assertEquals(orig.hashCode(), copy.hashCode());
-			assertTrue(orig.equals(copy));
-
-		} catch (IOException ioe) {
-			fail(ioe.getMessage());
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/event/job/ManagementEventTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/event/job/ManagementEventTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/event/job/ManagementEventTest.java
index 2a30ae4..99e750b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/event/job/ManagementEventTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/event/job/ManagementEventTest.java
@@ -16,13 +16,14 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.event.job;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
-import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.event.job.ExecutionStateChangeEvent;
+import org.apache.flink.runtime.event.job.RecentJobEvent;
+import org.apache.flink.runtime.execution.ExecutionState2;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.managementgraph.ManagementVertexID;
@@ -31,9 +32,9 @@ import org.junit.Test;
 
 /**
  * This test checks the proper serialization and deserialization of job events.
- * 
  */
 public class ManagementEventTest {
+
 	/**
 	 * The time stamp used during the tests.
 	 */
@@ -51,7 +52,7 @@ public class ManagementEventTest {
 	public void testExecutionStateChangeEvent() {
 
 		final ExecutionStateChangeEvent orig = new ExecutionStateChangeEvent(TIMESTAMP, new ManagementVertexID(),
-			ExecutionState.READY);
+			ExecutionState2.DEPLOYING);
 
 		final ExecutionStateChangeEvent copy = (ExecutionStateChangeEvent) ManagementTestUtils.createCopy(orig);
 
@@ -68,8 +69,7 @@ public class ManagementEventTest {
 	@Test
 	public void testRecentJobEvent() {
 
-		final RecentJobEvent orig = new RecentJobEvent(new JobID(), JOBNAME, JobStatus.SCHEDULED, true, TIMESTAMP,
-			TIMESTAMP);
+		final RecentJobEvent orig = new RecentJobEvent(new JobID(), JOBNAME, JobStatus.RUNNING, true, TIMESTAMP, TIMESTAMP);
 
 		final RecentJobEvent copy = (RecentJobEvent) ManagementTestUtils.createCopy(orig);
 
@@ -82,20 +82,4 @@ public class ManagementEventTest {
 		assertEquals(orig.hashCode(), copy.hashCode());
 		assertTrue(orig.equals(copy));
 	}
-
-	/**
-	 * Tests serialization/deserialization for {@link VertexAssignmentEvent}.
-	 */
-	@Test
-	public void testVertexAssignmentEvent() {
-
-		final VertexAssignmentEvent orig = new VertexAssignmentEvent(TIMESTAMP, new ManagementVertexID(), "test");
-		final VertexAssignmentEvent copy = (VertexAssignmentEvent) ManagementTestUtils.createCopy(orig);
-
-		assertEquals(orig.getVertexID(), copy.getVertexID());
-		assertEquals(orig.getTimestamp(), copy.getTimestamp());
-		assertEquals(orig.getInstanceName(), copy.getInstanceName());
-		assertEquals(orig.hashCode(), copy.hashCode());
-		assertTrue(orig.equals(copy));
-	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
new file mode 100644
index 0000000..498f773
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import java.util.Arrays;
+
+import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class AllVerticesIteratorTest {
+
+	@Test
+	public void testAllVertices() {
+		try {
+			
+			AbstractJobVertex v1 = new AbstractJobVertex("v1");
+			AbstractJobVertex v2 = new AbstractJobVertex("v2");
+			AbstractJobVertex v3 = new AbstractJobVertex("v3");
+			AbstractJobVertex v4 = new AbstractJobVertex("v4");
+			
+			v1.setParallelism(1);
+			v2.setParallelism(7);
+			v3.setParallelism(3);
+			v4.setParallelism(2);
+			
+			ExecutionGraph eg = Mockito.mock(ExecutionGraph.class);
+					
+			ExecutionJobVertex ejv1 = new ExecutionJobVertex(eg, v1, 1);
+			ExecutionJobVertex ejv2 = new ExecutionJobVertex(eg, v2, 1);
+			ExecutionJobVertex ejv3 = new ExecutionJobVertex(eg, v3, 1);
+			ExecutionJobVertex ejv4 = new ExecutionJobVertex(eg, v4, 1);
+			
+			AllVerticesIterator iter = new AllVerticesIterator(Arrays.asList(ejv1, ejv2, ejv3, ejv4).iterator());
+			
+			int numReturned = 0;
+			while (iter.hasNext()) {
+				iter.hasNext();
+				Assert.assertNotNull(iter.next());
+				numReturned++;
+			}
+			
+			Assert.assertEquals(13, numReturned);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
new file mode 100644
index 0000000..b6f532e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
@@ -0,0 +1,570 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.mockito.Matchers;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.core.io.InputSplitSource;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.LogUtils;
+
+/**
+ * This class contains test concerning the correct conversion from {@link JobGraph} to {@link ExecutionGraph} objects.
+ */
+public class ExecutionGraphConstructionTest {
+	
+	@BeforeClass
+	public static void setLogLevel() {
+		LogUtils.initializeDefaultTestConsoleLogger();
+	}
+
+	
+	/**
+	 * Creates a JobGraph of the following form:
+	 * 
+	 * <pre>
+	 *  v1--->v2-->\
+	 *              \
+	 *               v4 --->\
+	 *        ----->/        \
+	 *  v3-->/                v5
+	 *       \               /
+	 *        ------------->/
+	 * </pre>
+	 */
+	@Test
+	public void testCreateSimpleGraphBipartite() {
+		
+		final JobID jobId = new JobID();
+		final String jobName = "Test Job Sample Name";
+		final Configuration cfg = new Configuration();
+		
+		AbstractJobVertex v1 = new AbstractJobVertex("vertex1");
+		AbstractJobVertex v2 = new AbstractJobVertex("vertex2");
+		AbstractJobVertex v3 = new AbstractJobVertex("vertex3");
+		AbstractJobVertex v4 = new AbstractJobVertex("vertex4");
+		AbstractJobVertex v5 = new AbstractJobVertex("vertex5");
+		
+		v1.setParallelism(5);
+		v2.setParallelism(7);
+		v3.setParallelism(2);
+		v4.setParallelism(11);
+		v5.setParallelism(4);
+		
+		v2.connectNewDataSetAsInput(v1, DistributionPattern.BIPARTITE);
+		v4.connectNewDataSetAsInput(v2, DistributionPattern.BIPARTITE);
+		v4.connectNewDataSetAsInput(v3, DistributionPattern.BIPARTITE);
+		v5.connectNewDataSetAsInput(v4, DistributionPattern.BIPARTITE);
+		v5.connectNewDataSetAsInput(v3, DistributionPattern.BIPARTITE);
+		
+		List<AbstractJobVertex> ordered = new ArrayList<AbstractJobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
+
+		ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg);
+		try {
+			eg.attachJobGraph(ordered);
+		}
+		catch (JobException e) {
+			e.printStackTrace();
+			fail("Job failed with exception: " + e.getMessage());
+		}
+		
+		verifyTestGraph(eg, jobId, v1, v2, v3, v4, v5);
+	}
+	
+	@Test
+	public void testAttachViaDataSets() {
+		final JobID jobId = new JobID();
+		final String jobName = "Test Job Sample Name";
+		final Configuration cfg = new Configuration();
+		
+		// construct part one of the execution graph
+		AbstractJobVertex v1 = new AbstractJobVertex("vertex1");
+		AbstractJobVertex v2 = new AbstractJobVertex("vertex2");
+		AbstractJobVertex v3 = new AbstractJobVertex("vertex3");
+		
+		v1.setParallelism(5);
+		v2.setParallelism(7);
+		v3.setParallelism(2);
+		
+		// this creates an intermediate result for v1
+		v2.connectNewDataSetAsInput(v1, DistributionPattern.BIPARTITE);
+		
+		// create results for v2 and v3
+		IntermediateDataSet v2result = v2.createAndAddResultDataSet();
+		IntermediateDataSet v3result_1 = v3.createAndAddResultDataSet();
+		IntermediateDataSet v3result_2 = v3.createAndAddResultDataSet();
+		
+		
+		List<AbstractJobVertex> ordered = new ArrayList<AbstractJobVertex>(Arrays.asList(v1, v2, v3));
+
+		ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg);
+		try {
+			eg.attachJobGraph(ordered);
+		}
+		catch (JobException e) {
+			e.printStackTrace();
+			fail("Job failed with exception: " + e.getMessage());
+		}
+		
+		// attach the second part of the graph
+		
+		AbstractJobVertex v4 = new AbstractJobVertex("vertex4");
+		AbstractJobVertex v5 = new AbstractJobVertex("vertex5");
+		v4.setParallelism(11);
+		v5.setParallelism(4);
+		
+		v4.connectDataSetAsInput(v2result, DistributionPattern.BIPARTITE);
+		v4.connectDataSetAsInput(v3result_1, DistributionPattern.BIPARTITE);
+		v5.connectNewDataSetAsInput(v4, DistributionPattern.BIPARTITE);
+		v5.connectDataSetAsInput(v3result_2, DistributionPattern.BIPARTITE);
+		
+		List<AbstractJobVertex> ordered2 = new ArrayList<AbstractJobVertex>(Arrays.asList(v4, v5));
+		
+		try {
+			eg.attachJobGraph(ordered2);
+		}
+		catch (JobException e) {
+			e.printStackTrace();
+			fail("Job failed with exception: " + e.getMessage());
+		}
+		
+		// verify
+		verifyTestGraph(eg, jobId, v1, v2, v3, v4, v5);
+	}
+	
+	@Test
+	public void testAttachViaIds() {
+		final JobID jobId = new JobID();
+		final String jobName = "Test Job Sample Name";
+		final Configuration cfg = new Configuration();
+		
+		// construct part one of the execution graph
+		AbstractJobVertex v1 = new AbstractJobVertex("vertex1");
+		AbstractJobVertex v2 = new AbstractJobVertex("vertex2");
+		AbstractJobVertex v3 = new AbstractJobVertex("vertex3");
+		
+		v1.setParallelism(5);
+		v2.setParallelism(7);
+		v3.setParallelism(2);
+		
+		// this creates an intermediate result for v1
+		v2.connectNewDataSetAsInput(v1, DistributionPattern.BIPARTITE);
+		
+		// create results for v2 and v3
+		IntermediateDataSet v2result = v2.createAndAddResultDataSet();
+		IntermediateDataSet v3result_1 = v3.createAndAddResultDataSet();
+		IntermediateDataSet v3result_2 = v3.createAndAddResultDataSet();
+		
+		
+		List<AbstractJobVertex> ordered = new ArrayList<AbstractJobVertex>(Arrays.asList(v1, v2, v3));
+
+		ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg);
+		try {
+			eg.attachJobGraph(ordered);
+		}
+		catch (JobException e) {
+			e.printStackTrace();
+			fail("Job failed with exception: " + e.getMessage());
+		}
+		
+		// attach the second part of the graph
+		
+		AbstractJobVertex v4 = new AbstractJobVertex("vertex4");
+		AbstractJobVertex v5 = new AbstractJobVertex("vertex5");
+		v4.setParallelism(11);
+		v5.setParallelism(4);
+		
+		v4.connectIdInput(v2result.getId(), DistributionPattern.BIPARTITE);
+		v4.connectIdInput(v3result_1.getId(), DistributionPattern.BIPARTITE);
+		v5.connectNewDataSetAsInput(v4, DistributionPattern.BIPARTITE);
+		v5.connectIdInput(v3result_2.getId(), DistributionPattern.BIPARTITE);
+		
+		List<AbstractJobVertex> ordered2 = new ArrayList<AbstractJobVertex>(Arrays.asList(v4, v5));
+		
+		try {
+			eg.attachJobGraph(ordered2);
+		}
+		catch (JobException e) {
+			e.printStackTrace();
+			fail("Job failed with exception: " + e.getMessage());
+		}
+		
+		// verify
+		verifyTestGraph(eg, jobId, v1, v2, v3, v4, v5);
+	}
+	
+	private void verifyTestGraph(ExecutionGraph eg, JobID jobId,
+				AbstractJobVertex v1, AbstractJobVertex v2, AbstractJobVertex v3,
+				AbstractJobVertex v4, AbstractJobVertex v5)
+	{
+		Map<JobVertexID, ExecutionJobVertex> vertices = eg.getAllVertices();
+		
+		// verify v1
+		{
+			ExecutionJobVertex e1 = vertices.get(v1.getID());
+			assertNotNull(e1);
+			
+			// basic properties
+			assertEquals(v1.getParallelism(), e1.getParallelism());
+			assertEquals(v1.getID(), e1.getJobVertexId());
+			assertEquals(jobId, e1.getJobId());
+			assertEquals(v1, e1.getJobVertex());
+			
+			// produced data sets
+			assertEquals(1, e1.getProducedDataSets().length);
+			assertEquals(v1.getProducedDataSets().get(0).getId(), e1.getProducedDataSets()[0].getId());
+			assertEquals(v1.getParallelism(), e1.getProducedDataSets()[0].getPartitions().length);
+			
+			// task vertices
+			assertEquals(v1.getParallelism(), e1.getTaskVertices().length);
+			
+			int num = 0;
+			for (ExecutionVertex2 ev : e1.getTaskVertices()) {
+				assertEquals(jobId, ev.getJobId());
+				assertEquals(v1.getID(), ev.getJobvertexId());
+				
+				assertEquals(v1.getParallelism(), ev.getTotalNumberOfParallelSubtasks());
+				assertEquals(num++, ev.getParallelSubtaskIndex());
+				
+				assertEquals(0, ev.getNumberOfInputs());
+			}
+		}
+		
+		// verify v2
+		{
+			ExecutionJobVertex e2 = vertices.get(v2.getID());
+			assertNotNull(e2);
+			
+			// produced data sets
+			assertEquals(1, e2.getProducedDataSets().length);
+			assertEquals(v2.getProducedDataSets().get(0).getId(), e2.getProducedDataSets()[0].getId());
+			assertEquals(v2.getParallelism(), e2.getProducedDataSets()[0].getPartitions().length);
+			
+			// task vertices
+			assertEquals(v2.getParallelism(), e2.getTaskVertices().length);
+			
+			int num = 0;
+			for (ExecutionVertex2 ev : e2.getTaskVertices()) {
+				assertEquals(jobId, ev.getJobId());
+				assertEquals(v2.getID(), ev.getJobvertexId());
+				
+				assertEquals(v2.getParallelism(), ev.getTotalNumberOfParallelSubtasks());
+				assertEquals(num++, ev.getParallelSubtaskIndex());
+				
+				assertEquals(1, ev.getNumberOfInputs());
+				ExecutionEdge2[] inputs = ev.getInputEdges(0);
+				assertEquals(v1.getParallelism(), inputs.length);
+				
+				int sumOfPartitions = 0;
+				for (ExecutionEdge2 inEdge : inputs) {
+					assertEquals(0,inEdge.getInputNum());
+					sumOfPartitions += inEdge.getSource().getPartition();
+				}
+				
+				assertEquals(10, sumOfPartitions);
+			}
+		}
+		
+		// verify v3
+		{
+			ExecutionJobVertex e3 = vertices.get(v3.getID());
+			assertNotNull(e3);
+			
+			// produced data sets
+			assertEquals(2, e3.getProducedDataSets().length);
+			assertEquals(v3.getProducedDataSets().get(0).getId(), e3.getProducedDataSets()[0].getId());
+			assertEquals(v3.getProducedDataSets().get(1).getId(), e3.getProducedDataSets()[1].getId());
+			assertEquals(v3.getParallelism(), e3.getProducedDataSets()[0].getPartitions().length);
+			assertEquals(v3.getParallelism(), e3.getProducedDataSets()[1].getPartitions().length);
+			
+			// task vertices
+			assertEquals(v3.getParallelism(), e3.getTaskVertices().length);
+			
+			int num = 0;
+			for (ExecutionVertex2 ev : e3.getTaskVertices()) {
+				assertEquals(jobId, ev.getJobId());
+				assertEquals(v3.getID(), ev.getJobvertexId());
+				
+				assertEquals(v3.getParallelism(), ev.getTotalNumberOfParallelSubtasks());
+				assertEquals(num++, ev.getParallelSubtaskIndex());
+				
+				assertEquals(0, ev.getNumberOfInputs());
+			}
+		}
+
+		// verify v4
+		{
+			ExecutionJobVertex e4 = vertices.get(v4.getID());
+			assertNotNull(e4);
+			
+			// produced data sets
+			assertEquals(1, e4.getProducedDataSets().length);
+			assertEquals(v4.getProducedDataSets().get(0).getId(), e4.getProducedDataSets()[0].getId());
+			
+			// task vertices
+			assertEquals(v4.getParallelism(), e4.getTaskVertices().length);
+			
+			int num = 0;
+			for (ExecutionVertex2 ev : e4.getTaskVertices()) {
+				assertEquals(jobId, ev.getJobId());
+				assertEquals(v4.getID(), ev.getJobvertexId());
+				
+				assertEquals(v4.getParallelism(), ev.getTotalNumberOfParallelSubtasks());
+				assertEquals(num++, ev.getParallelSubtaskIndex());
+				
+				assertEquals(2, ev.getNumberOfInputs());
+				// first input
+				{
+					ExecutionEdge2[] inputs = ev.getInputEdges(0);
+					assertEquals(v2.getParallelism(), inputs.length);
+					
+					int sumOfPartitions = 0;
+					for (ExecutionEdge2 inEdge : inputs) {
+						assertEquals(0, inEdge.getInputNum());
+						sumOfPartitions += inEdge.getSource().getPartition();
+					}
+					
+					assertEquals(21, sumOfPartitions);
+				}
+				// second input
+				{
+					ExecutionEdge2[] inputs = ev.getInputEdges(1);
+					assertEquals(v3.getParallelism(), inputs.length);
+					
+					int sumOfPartitions = 0;
+					for (ExecutionEdge2 inEdge : inputs) {
+						assertEquals(1, inEdge.getInputNum());
+						sumOfPartitions += inEdge.getSource().getPartition();
+					}
+					
+					assertEquals(1, sumOfPartitions);
+				}
+			}
+		}
+		
+		// verify v5
+		{
+			ExecutionJobVertex e5 = vertices.get(v5.getID());
+			assertNotNull(e5);
+			
+			// produced data sets
+			assertEquals(0, e5.getProducedDataSets().length);
+			
+			// task vertices
+			assertEquals(v5.getParallelism(), e5.getTaskVertices().length);
+			
+			int num = 0;
+			for (ExecutionVertex2 ev : e5.getTaskVertices()) {
+				assertEquals(jobId, ev.getJobId());
+				assertEquals(v5.getID(), ev.getJobvertexId());
+				
+				assertEquals(v5.getParallelism(), ev.getTotalNumberOfParallelSubtasks());
+				assertEquals(num++, ev.getParallelSubtaskIndex());
+				
+				assertEquals(2, ev.getNumberOfInputs());
+				// first input
+				{
+					ExecutionEdge2[] inputs = ev.getInputEdges(0);
+					assertEquals(v4.getParallelism(), inputs.length);
+					
+					int sumOfPartitions = 0;
+					for (ExecutionEdge2 inEdge : inputs) {
+						assertEquals(0, inEdge.getInputNum());
+						sumOfPartitions += inEdge.getSource().getPartition();
+					}
+					
+					assertEquals(55, sumOfPartitions);
+				}
+				// second input
+				{
+					ExecutionEdge2[] inputs = ev.getInputEdges(1);
+					assertEquals(v3.getParallelism(), inputs.length);
+					
+					int sumOfPartitions = 0;
+					for (ExecutionEdge2 inEdge : inputs) {
+						assertEquals(1, inEdge.getInputNum());
+						sumOfPartitions += inEdge.getSource().getPartition();
+					}
+					
+					assertEquals(1, sumOfPartitions);
+				}
+			}
+		}
+	}
+	
+	@Test
+	public void testCannotConnectMissingId() {
+		final JobID jobId = new JobID();
+		final String jobName = "Test Job Sample Name";
+		final Configuration cfg = new Configuration();
+		
+		// construct part one of the execution graph
+		AbstractJobVertex v1 = new AbstractJobVertex("vertex1");
+		v1.setParallelism(7);
+		
+		List<AbstractJobVertex> ordered = new ArrayList<AbstractJobVertex>(Arrays.asList(v1));
+
+		ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg);
+		try {
+			eg.attachJobGraph(ordered);
+		}
+		catch (JobException e) {
+			e.printStackTrace();
+			fail("Job failed with exception: " + e.getMessage());
+		}
+		
+		// attach the second part of the graph
+		AbstractJobVertex v2 = new AbstractJobVertex("vertex2");
+		v2.connectIdInput(new IntermediateDataSetID(), DistributionPattern.BIPARTITE);
+		
+		List<AbstractJobVertex> ordered2 = new ArrayList<AbstractJobVertex>(Arrays.asList(v2));
+		
+		try {
+			eg.attachJobGraph(ordered2);
+			fail("Attached wrong jobgraph");
+		}
+		catch (JobException e) {
+			// expected
+		}
+	}
+
+	@Test
+	public void testCannotConnectWrongOrder() {
+		final JobID jobId = new JobID();
+		final String jobName = "Test Job Sample Name";
+		final Configuration cfg = new Configuration();
+		
+		AbstractJobVertex v1 = new AbstractJobVertex("vertex1");
+		AbstractJobVertex v2 = new AbstractJobVertex("vertex2");
+		AbstractJobVertex v3 = new AbstractJobVertex("vertex3");
+		AbstractJobVertex v4 = new AbstractJobVertex("vertex4");
+		AbstractJobVertex v5 = new AbstractJobVertex("vertex5");
+		
+		v1.setParallelism(5);
+		v2.setParallelism(7);
+		v3.setParallelism(2);
+		v4.setParallelism(11);
+		v5.setParallelism(4);
+		
+		v2.connectNewDataSetAsInput(v1, DistributionPattern.BIPARTITE);
+		v4.connectNewDataSetAsInput(v2, DistributionPattern.BIPARTITE);
+		v4.connectNewDataSetAsInput(v3, DistributionPattern.BIPARTITE);
+		v5.connectNewDataSetAsInput(v4, DistributionPattern.BIPARTITE);
+		v5.connectNewDataSetAsInput(v3, DistributionPattern.BIPARTITE);
+		
+		List<AbstractJobVertex> ordered = new ArrayList<AbstractJobVertex>(Arrays.asList(v1, v2, v3, v5, v4));
+
+		ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg);
+		try {
+			eg.attachJobGraph(ordered);
+			fail("Attached wrong jobgraph");
+		}
+		catch (JobException e) {
+			// expected
+		}
+	}
+	
+	@Test
+	public void testSetupInputSplits() {
+		try {
+			final InputSplit[] emptySplits = new InputSplit[0];
+			
+			InputSplitAssigner assigner1 = mock(InputSplitAssigner.class);
+			InputSplitAssigner assigner2 = mock(InputSplitAssigner.class);
+			
+			@SuppressWarnings("unchecked")
+			InputSplitSource<InputSplit> source1 = mock(InputSplitSource.class);
+			@SuppressWarnings("unchecked")
+			InputSplitSource<InputSplit> source2 = mock(InputSplitSource.class);
+			
+			when(source1.createInputSplits(Matchers.anyInt())).thenReturn(emptySplits);
+			when(source2.createInputSplits(Matchers.anyInt())).thenReturn(emptySplits);
+			when(source1.getInputSplitAssigner(emptySplits)).thenReturn(assigner1);
+			when(source2.getInputSplitAssigner(emptySplits)).thenReturn(assigner2);
+			
+			final JobID jobId = new JobID();
+			final String jobName = "Test Job Sample Name";
+			final Configuration cfg = new Configuration();
+			
+			AbstractJobVertex v1 = new AbstractJobVertex("vertex1");
+			AbstractJobVertex v2 = new AbstractJobVertex("vertex2");
+			AbstractJobVertex v3 = new AbstractJobVertex("vertex3");
+			AbstractJobVertex v4 = new AbstractJobVertex("vertex4");
+			AbstractJobVertex v5 = new AbstractJobVertex("vertex5");
+			
+			v1.setParallelism(5);
+			v2.setParallelism(7);
+			v3.setParallelism(2);
+			v4.setParallelism(11);
+			v5.setParallelism(4);
+			
+			v2.connectNewDataSetAsInput(v1, DistributionPattern.BIPARTITE);
+			v4.connectNewDataSetAsInput(v2, DistributionPattern.BIPARTITE);
+			v4.connectNewDataSetAsInput(v3, DistributionPattern.BIPARTITE);
+			v5.connectNewDataSetAsInput(v4, DistributionPattern.BIPARTITE);
+			v5.connectNewDataSetAsInput(v3, DistributionPattern.BIPARTITE);
+			
+			v3.setInputSplitSource(source1);
+			v5.setInputSplitSource(source2);
+			
+			List<AbstractJobVertex> ordered = new ArrayList<AbstractJobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
+
+			ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg);
+			try {
+				eg.attachJobGraph(ordered);
+			}
+			catch (JobException e) {
+				e.printStackTrace();
+				fail("Job failed with exception: " + e.getMessage());
+			}
+			
+			assertEquals(assigner1, eg.getAllVertices().get(v3.getID()).getSplitAssigner());
+			assertEquals(assigner2, eg.getAllVertices().get(v5.getID()).getSplitAssigner());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
new file mode 100644
index 0000000..9705dcd
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getInstance;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.doAnswer;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.execution.ExecutionState2;
+import org.apache.flink.runtime.instance.AllocatedSlot;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.operators.RegularPactTask;
+import org.apache.flink.runtime.protocols.TaskOperationProtocol;
+import org.apache.flink.runtime.taskmanager.TaskOperationResult;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class ExecutionGraphDeploymentTest {
+
+	@Test
+	public void testBuildDeploymentDescriptor() {
+		try {
+			final JobID jobId = new JobID();
+			
+			final JobVertexID jid1 = new JobVertexID();
+			final JobVertexID jid2 = new JobVertexID();
+			final JobVertexID jid3 = new JobVertexID();
+			final JobVertexID jid4 = new JobVertexID();
+			
+			AbstractJobVertex v1 = new AbstractJobVertex("v1", jid1);
+			AbstractJobVertex v2 = new AbstractJobVertex("v2", jid2);
+			AbstractJobVertex v3 = new AbstractJobVertex("v3", jid3);
+			AbstractJobVertex v4 = new AbstractJobVertex("v4", jid4);
+			
+			v1.setParallelism(10);
+			v2.setParallelism(10);
+			v3.setParallelism(10);
+			v4.setParallelism(10);
+			
+			v1.setInvokableClass(RegularPactTask.class);
+			v2.setInvokableClass(RegularPactTask.class);
+			v3.setInvokableClass(RegularPactTask.class);
+			v4.setInvokableClass(RegularPactTask.class);
+			
+			v2.connectNewDataSetAsInput(v1, DistributionPattern.BIPARTITE);
+			v3.connectNewDataSetAsInput(v2, DistributionPattern.BIPARTITE);
+			v4.connectNewDataSetAsInput(v2, DistributionPattern.BIPARTITE);
+			
+			ExecutionGraph eg = spy(new ExecutionGraph(jobId, "some job", new Configuration()));
+			doAnswer(new Answer<Void>() {
+				@Override
+				public Void answer(InvocationOnMock invocation) {
+					final Runnable parameter = (Runnable) invocation.getArguments()[0];
+					parameter.run();
+					return null;
+				}
+				
+			}).when(eg).execute(Matchers.any(Runnable.class));
+			
+			List<AbstractJobVertex> ordered = Arrays.asList(v1, v2, v3, v4);
+			
+			eg.attachJobGraph(ordered);
+			
+			ExecutionJobVertex ejv = eg.getAllVertices().get(jid2);
+			ExecutionVertex2 vertex = ejv.getTaskVertices()[3];
+			
+			// just some reference (needs not be atomic)
+			final AtomicReference<TaskDeploymentDescriptor> reference = new AtomicReference<TaskDeploymentDescriptor>();
+			
+			// mock taskmanager to simply accept the call
+			TaskOperationProtocol taskManager = mock(TaskOperationProtocol.class);
+			when(taskManager.submitTask(Matchers.any(TaskDeploymentDescriptor.class))).thenAnswer(new Answer<TaskOperationResult>() {
+				@Override
+				public TaskOperationResult answer(InvocationOnMock invocation) {
+					final TaskDeploymentDescriptor parameter = (TaskDeploymentDescriptor) invocation.getArguments()[0];
+					reference.set(parameter);
+					return new TaskOperationResult(jid2, 0, true);
+				}
+			});
+			
+			final Instance instance = getInstance(taskManager);
+			final AllocatedSlot slot = instance.allocateSlot(jobId);
+			
+			assertEquals(ExecutionState2.CREATED, vertex.getExecutionState());
+			vertex.deployToSlot(slot);
+			assertEquals(ExecutionState2.RUNNING, vertex.getExecutionState());
+			
+			TaskDeploymentDescriptor descr = reference.get();
+			assertNotNull(descr);
+			
+			assertEquals(jobId, descr.getJobID());
+			assertEquals(jid2, descr.getVertexID());
+			assertEquals(3, descr.getIndexInSubtaskGroup());
+			assertEquals(10, descr.getCurrentNumberOfSubtasks());
+			assertEquals(RegularPactTask.class.getName(), descr.getInvokableClassName());
+			assertEquals("v2", descr.getTaskName());
+			
+			assertEquals(2, descr.getOutputGates().size());
+			assertEquals(1, descr.getInputGates().size());
+			
+			assertEquals(10, descr.getOutputGates().get(0).getChannels().size());
+			assertEquals(10, descr.getOutputGates().get(1).getChannels().size());
+			assertEquals(10, descr.getInputGates().get(0).getChannels().size());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTest.java
deleted file mode 100644
index 36e3640..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTest.java
+++ /dev/null
@@ -1,955 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.executiongraph;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
-import org.apache.flink.runtime.io.network.channels.ChannelType;
-import org.apache.flink.runtime.jobgraph.DistributionPattern;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobGraphDefinitionException;
-import org.apache.flink.runtime.jobgraph.JobID;
-import org.apache.flink.runtime.jobgraph.JobInputVertex;
-import org.apache.flink.runtime.jobgraph.JobOutputVertex;
-import org.apache.flink.runtime.jobgraph.JobTaskVertex;
-import org.apache.flink.runtime.operators.DataSinkTask;
-import org.apache.flink.runtime.operators.DataSourceTask;
-import org.apache.flink.runtime.testutils.ServerTestUtils;
-import org.junit.Test;
-import org.apache.flink.api.java.io.DiscardingOuputFormat;
-import org.apache.flink.api.java.io.TextInputFormat;
-
-/**
- * This class contains test concerning the correct conversion from {@link JobGraph} to {@link ExecutionGraph} objects.
- * 
- */
-public class ExecutionGraphTest {
-	/*
-	 * input1 -> task1 -> output1
-	 * output1 shares instance with input1
-	 * input1 shares instance with task1
-	 * no subtasks defined
-	 * input1 is default, task1 is m1.large, output1 is m1.xlarge
-	 * no channel types defined
-	 */
-	@Test
-	public void testConvertJobGraphToExecutionGraph1() {
-
-		File inputFile = null;
-		JobID jobID = null;
-
-		try {
-			inputFile = ServerTestUtils.createInputFile(0);
-
-			// create job graph
-			final JobGraph jg = new JobGraph("Job Graph 1");
-			jobID = jg.getJobID();
-
-			// input vertex
-			final JobInputVertex i1 = new JobInputVertex("Input 1", jg);
-			i1.setNumberOfSubtasks(1);
-			i1.setInvokableClass(DataSourceTask.class);
-			TextInputFormat inputFormat = new TextInputFormat(new Path(inputFile.toURI()));
-			i1.setInputFormat(inputFormat);
-
-			// task vertex
-			final JobTaskVertex t1 = new JobTaskVertex("Task 1", jg);
-			t1.setInvokableClass(ForwardTask1Input1Output.class);
-
-			// output vertex
-			final JobOutputVertex o1 = new JobOutputVertex("Output 1", jg);
-			o1.setNumberOfSubtasks(1);
-			o1.setInvokableClass(DataSinkTask.class);
-			o1.setOutputFormat(new DiscardingOuputFormat<Object>());
-
-			o1.setVertexToShareInstancesWith(i1);
-			i1.setVertexToShareInstancesWith(t1);
-
-			// connect vertices
-			i1.connectTo(t1);
-			t1.connectTo(o1);
-
-			LibraryCacheManager.register(jobID, new String[0]);
-
-			final ExecutionGraph eg = new ExecutionGraph(jg, 1);
-
-			// test all methods of ExecutionGraph
-			final ExecutionStage executionStage = eg.getCurrentExecutionStage();
-			assertEquals(1, executionStage.getMaxNumberSubtasks());
-
-			assertEquals(jobID, eg.getJobID());
-			assertEquals(0, eg.getIndexOfCurrentExecutionStage());
-			assertEquals(1, eg.getNumberOfInputVertices());
-			assertEquals(1, eg.getNumberOfOutputVertices());
-			assertEquals(1, eg.getNumberOfStages());
-			assertNotNull(eg.getInputVertex(0));
-			assertNull(eg.getInputVertex(1));
-			assertNotNull(eg.getOutputVertex(0));
-			assertNull(eg.getOutputVertex(1));
-			assertNotNull(eg.getStage(0));
-			assertNull(eg.getStage(1));
-
-			// test all methods of ExecutionStage stage0
-			ExecutionStage es = eg.getStage(0);
-
-			assertEquals(3, es.getNumberOfStageMembers());
-			assertEquals(0, es.getStageNumber());
-			assertNotNull(es.getStageMember(0));
-			assertNotNull(es.getStageMember(1));
-			assertNotNull(es.getStageMember(2));
-			assertNull(es.getStageMember(3));
-
-			// test all methods of ExecutionGroupVertex
-			ExecutionGroupVertex egv0 = null; // input1
-			ExecutionGroupVertex egv1 = null; // output1
-			ExecutionGroupVertex egv2 = null; // task1
-
-			if (es.getStageMember(0).getName().equals("Input 1")) {
-				egv0 = es.getStageMember(0);
-			} else if (es.getStageMember(0).getName().equals("Output 1")) {
-				egv1 = es.getStageMember(0);
-			} else {
-				egv2 = es.getStageMember(0);
-			}
-
-			if (es.getStageMember(1).getName().equals("Input 1")) {
-				egv0 = es.getStageMember(1);
-			} else if (es.getStageMember(1).getName().equals("Output 1")) {
-				egv1 = es.getStageMember(1);
-			} else {
-				egv2 = es.getStageMember(1);
-			}
-
-			if (es.getStageMember(2).getName().equals("Input 1")) {
-				egv0 = es.getStageMember(2);
-			} else if (es.getStageMember(2).getName().equals("Output 1")) {
-				egv1 = es.getStageMember(2);
-			} else {
-				egv2 = es.getStageMember(2);
-			}
-
-			// egv0 (input1)
-			assertNull(egv0.getBackwardEdge(0));
-			assertNotNull(egv0.getConfiguration());
-			assertEquals(1, egv0.getCurrentNumberOfGroupMembers());
-			assertNotNull(egv0.getExecutionSignature());
-			assertEquals(es, egv0.getExecutionStage());
-			assertNotNull(egv0.getForwardEdge(0));
-			assertNull(egv0.getForwardEdge(1));
-			assertNotNull(egv0.getForwardEdges(egv2));
-			assertNotNull(egv0.getGroupMember(0));
-			assertNull(egv0.getGroupMember(1));
-			assertEquals(1, egv0.getInputSplits().length);
-			assertEquals("Input 1", egv0.getName());
-			assertEquals(0, egv0.getNumberOfBackwardLinks());
-			assertEquals(1, egv0.getNumberOfForwardLinks());
-			assertEquals(0, egv0.getStageNumber());
-			assertEquals(1, egv0.getUserDefinedNumberOfMembers());
-			assertEquals("Task 1", egv0.getVertexToShareInstancesWith().getName());
-
-			// egv1 (output1)
-			assertNotNull(egv1.getBackwardEdge(0));
-			assertNull(egv1.getBackwardEdge(1));
-			assertNotNull(egv1.getBackwardEdges(egv2));
-			assertNotNull(egv1.getConfiguration());
-			assertEquals(1, egv1.getCurrentNumberOfGroupMembers());
-			assertNotNull(egv1.getExecutionSignature());
-			assertEquals(es, egv1.getExecutionStage());
-			assertNull(egv1.getForwardEdge(0));
-			assertNotNull(egv1.getGroupMember(0));
-			assertNull(egv1.getGroupMember(1));
-			assertEquals("Output 1", egv1.getName());
-			assertEquals(1, egv1.getNumberOfBackwardLinks());
-			assertEquals(0, egv1.getNumberOfForwardLinks());
-			assertEquals(0, egv1.getStageNumber());
-			assertEquals(1, egv1.getUserDefinedNumberOfMembers());
-			assertEquals("Input 1", egv1.getVertexToShareInstancesWith().getName());
-
-			// egv2 (task1)
-			assertNotNull(egv2.getBackwardEdge(0));
-			assertNull(egv2.getBackwardEdge(1));
-			assertNotNull(egv2.getBackwardEdges(egv0));
-			assertNotNull(egv2.getConfiguration());
-			assertEquals(1, egv2.getCurrentNumberOfGroupMembers());
-			assertNotNull(egv2.getExecutionSignature());
-			assertEquals(es, egv2.getExecutionStage());
-			assertNotNull(egv2.getForwardEdge(0));
-			assertNull(egv2.getForwardEdge(1));
-			assertNotNull(egv2.getForwardEdges(egv1));
-			assertNotNull(egv2.getGroupMember(0));
-			assertNull(egv2.getGroupMember(1));
-			assertEquals("Task 1", egv2.getName());
-			assertEquals(1, egv2.getNumberOfBackwardLinks());
-			assertEquals(1, egv2.getNumberOfForwardLinks());
-			assertEquals(0, egv2.getStageNumber());
-			assertEquals(1, egv2.getUserDefinedNumberOfMembers());
-			assertNull(egv2.getVertexToShareInstancesWith());
-
-			// test all methods of ExecutionVertex
-			ExecutionVertex ev0 = egv0.getGroupMember(0); // input1
-			ExecutionVertex ev1 = egv1.getGroupMember(0); // output1
-			ExecutionVertex ev2 = egv2.getGroupMember(0); // task1
-
-			// ev0 (input1)
-			assertEquals(egv0, ev0.getGroupVertex());
-			assertNotNull(ev0.getID());
-			assertEquals("Input 1", ev0.getName());
-
-			// ev1 (output1)
-			assertEquals(egv1, ev1.getGroupVertex());
-			assertNotNull(ev1.getID());
-			assertEquals("Output 1", ev1.getName());
-
-			// ev2 (task1)
-			assertEquals(egv2, ev2.getGroupVertex());
-			assertNotNull(ev2.getID());
-			assertEquals("Task 1", ev2.getName());
-
-			assertEquals(ev0.getAllocatedResource(), ev1.getAllocatedResource());
-			assertEquals(ev0.getAllocatedResource(), ev2.getAllocatedResource());
-
-			// test channels
-			assertEquals(ChannelType.NETWORK, eg.getChannelType(ev0, ev2));
-			assertEquals(ChannelType.NETWORK, eg.getChannelType(ev2, ev1));
-
-		} catch (GraphConversionException e) {
-			fail(e.getMessage());
-		} catch (JobGraphDefinitionException e) {
-			fail(e.getMessage());
-		} catch (IOException e) {
-			fail(e.getMessage());
-		} finally {
-			if (inputFile != null) {
-				inputFile.delete();
-			}
-			if (jobID != null) {
-				try {
-					LibraryCacheManager.unregister(jobID);
-				} catch (IOException e) {
-				}
-			}
-		}
-	}
-
-	/*
-	 * input1 -> task1 -> output1
-	 * no subtasks defined
-	 * input1 is default, task1 is m1.large, output1 is m1.xlarge
-	 * all channels are INMEMORY
-	 */
-	@Test
-	public void testConvertJobGraphToExecutionGraph2() {
-
-		File inputFile = null;
-		JobID jobID = null;
-
-		try {
-			inputFile = ServerTestUtils.createInputFile(0);
-
-			// create job graph
-			final JobGraph jg = new JobGraph("Job Graph 1");
-			jobID = jg.getJobID();
-
-			// input vertex
-			final JobInputVertex i1 = new JobInputVertex("Input 1", jg);
-			i1.setInvokableClass(DataSourceTask.class);
-			i1.setInputFormat(new TextInputFormat(new Path(inputFile.toURI())));
-			i1.setNumberOfSubtasks(1);
-
-			// task vertex
-			final JobTaskVertex t1 = new JobTaskVertex("Task 1", jg);
-			t1.setInvokableClass(ForwardTask1Input1Output.class);
-
-			// output vertex
-			final JobOutputVertex o1 = new JobOutputVertex("Output 1", jg);
-			o1.setNumberOfSubtasks(1);
-			o1.setInvokableClass(DataSinkTask.class);
-			o1.setOutputFormat(new DiscardingOuputFormat<Object>());
-
-			// connect vertices
-			i1.connectTo(t1, ChannelType.IN_MEMORY);
-			t1.connectTo(o1, ChannelType.IN_MEMORY);
-
-			LibraryCacheManager.register(jobID, new String[0]);
-
-			// now convert job graph to execution graph
-			final ExecutionGraph eg = new ExecutionGraph(jg, 1);
-
-			// test instance types in ExecutionGraph
-			final ExecutionStage executionStage = eg.getCurrentExecutionStage();
-			assertEquals(1, executionStage.getMaxNumberSubtasks());
-
-			// stage0
-			ExecutionStage es = eg.getStage(0);
-			ExecutionGroupVertex egv0 = null; // input1
-			ExecutionGroupVertex egv1 = null; // output1
-			ExecutionGroupVertex egv2 = null; // task1
-			if (es.getStageMember(0).getName().equals("Input 1")) {
-				egv0 = es.getStageMember(0);
-			} else if (es.getStageMember(0).getName().equals("Output 1")) {
-				egv1 = es.getStageMember(0);
-			} else {
-				egv2 = es.getStageMember(0);
-			}
-			if (es.getStageMember(1).getName().equals("Input 1")) {
-				egv0 = es.getStageMember(1);
-			} else if (es.getStageMember(1).getName().equals("Output 1")) {
-				egv1 = es.getStageMember(1);
-			} else {
-				egv2 = es.getStageMember(1);
-			}
-			if (es.getStageMember(2).getName().equals("Input 1")) {
-				egv0 = es.getStageMember(2);
-			} else if (es.getStageMember(2).getName().equals("Output 1")) {
-				egv1 = es.getStageMember(2);
-			} else {
-				egv2 = es.getStageMember(2);
-			}
-			ExecutionVertex ev0 = egv0.getGroupMember(0); // input1
-			ExecutionVertex ev1 = egv1.getGroupMember(0); // output1
-			ExecutionVertex ev2 = egv2.getGroupMember(0); // task1
-			assertEquals(ev0.getAllocatedResource(), ev1.getAllocatedResource());
-			assertEquals(ev0.getAllocatedResource(), ev2.getAllocatedResource());
-		} catch (GraphConversionException e) {
-			fail(e.getMessage());
-		} catch (IOException e) {
-			fail(e.getMessage());
-		} catch (JobGraphDefinitionException e) {
-			fail(e.getMessage());
-		} finally {
-			if (inputFile != null) {
-				inputFile.delete();
-			}
-			if (jobID != null) {
-				try {
-					LibraryCacheManager.unregister(jobID);
-				} catch (IOException e) {
-				}
-			}
-		}
-	}
-
-	/*
-	 * input1 -> task1 ->
-	 * task3 -> output1
-	 * input2 -> task2 ->
-	 * each vertex has 2 subtasks
-	 * no instance types defined
-	 * no channel types defined
-	 */
-	@Test
-	public void testConvertJobGraphToExecutionGraph3() {
-
-		File inputFile1 = null;
-		File inputFile2 = null;
-		File outputFile = null;
-		JobID jobID = null;
-
-		try {
-
-			inputFile1 = ServerTestUtils.createInputFile(0);
-			inputFile2 = ServerTestUtils.createInputFile(0);
-			outputFile = new File(ServerTestUtils.getRandomFilename());
-
-			// create job graph
-			final JobGraph jg = new JobGraph("Job Graph 1");
-			jobID = jg.getJobID();
-
-			// input vertex
-			final JobInputVertex i1 = new JobInputVertex("Input 1", jg);
-			i1.setInvokableClass(DataSourceTask.class);
-			i1.setInputFormat(new TextInputFormat(new Path(inputFile1.toURI())));
-			i1.setNumberOfSubtasks(2);
-			
-			final JobInputVertex i2 = new JobInputVertex("Input 2", jg);
-			i2.setInvokableClass(DataSourceTask.class);
-			i2.setInputFormat(new TextInputFormat(new Path(inputFile2.toURI())));
-			i2.setNumberOfSubtasks(2);
-
-			// task vertex
-			final JobTaskVertex t1 = new JobTaskVertex("Task 1", jg);
-			t1.setInvokableClass(ForwardTask1Input1Output.class);
-			t1.setNumberOfSubtasks(2);
-			final JobTaskVertex t2 = new JobTaskVertex("Task 2", jg);
-			t2.setInvokableClass(ForwardTask1Input1Output.class);
-			t2.setNumberOfSubtasks(2);
-			final JobTaskVertex t3 = new JobTaskVertex("Task 3", jg);
-			t3.setInvokableClass(ForwardTask2Inputs1Output.class);
-			t3.setNumberOfSubtasks(2);
-
-			
-			// output vertex
-			final JobOutputVertex o1 = new JobOutputVertex("Output 1", jg);
-			o1.setInvokableClass(DataSinkTask.class);
-			o1.setOutputFormat(new DiscardingOuputFormat<Object>());
-			o1.setNumberOfSubtasks(2);
-			i1.setVertexToShareInstancesWith(t1);
-			t1.setVertexToShareInstancesWith(t3);
-			i2.setVertexToShareInstancesWith(t2);
-			t2.setVertexToShareInstancesWith(t3);
-			t3.setVertexToShareInstancesWith(o1);
-
-			// connect vertices
-			i1.connectTo(t1);
-			i2.connectTo(t2);
-			t1.connectTo(t3);
-			t2.connectTo(t3);
-			t3.connectTo(o1);
-
-			LibraryCacheManager.register(jobID, new String[0]);
-
-			final ExecutionGraph eg = new ExecutionGraph(jg, 1);
-
-			// test instance types in ExecutionGraph
-			final ExecutionStage executionStage = eg.getCurrentExecutionStage();
-			assertEquals(2, executionStage.getMaxNumberSubtasks());
-
-			// stage0
-			final ExecutionStage es = eg.getStage(0);
-			ExecutionGroupVertex egv0 = null; // input1
-			ExecutionGroupVertex egv1 = null; // input2
-			ExecutionGroupVertex egv2 = null; // task1
-			ExecutionGroupVertex egv3 = null; // task2
-			ExecutionGroupVertex egv4 = null; // task3
-			ExecutionGroupVertex egv5 = null; // output1
-			if (es.getStageMember(0).getName().equals("Input 1")) {
-				egv0 = es.getStageMember(0);
-			} else if (es.getStageMember(0).getName().equals("Input 2")) {
-				egv1 = es.getStageMember(0);
-			} else if (es.getStageMember(0).getName().equals("Task 1")) {
-				egv2 = es.getStageMember(0);
-			} else if (es.getStageMember(0).getName().equals("Task 2")) {
-				egv3 = es.getStageMember(0);
-			} else if (es.getStageMember(0).getName().equals("Task 3")) {
-				egv4 = es.getStageMember(0);
-			} else {
-				egv5 = es.getStageMember(0);
-			}
-
-			if (es.getStageMember(1).getName().equals("Input 1")) {
-				egv0 = es.getStageMember(1);
-			} else if (es.getStageMember(1).getName().equals("Input 2")) {
-				egv1 = es.getStageMember(1);
-			} else if (es.getStageMember(1).getName().equals("Task 1")) {
-				egv2 = es.getStageMember(1);
-			} else if (es.getStageMember(1).getName().equals("Task 2")) {
-				egv3 = es.getStageMember(1);
-			} else if (es.getStageMember(1).getName().equals("Task 3")) {
-				egv4 = es.getStageMember(1);
-			} else {
-				egv5 = es.getStageMember(1);
-			}
-			if (es.getStageMember(2).getName().equals("Input 1")) {
-				egv0 = es.getStageMember(2);
-			} else if (es.getStageMember(2).getName().equals("Input 2")) {
-				egv1 = es.getStageMember(2);
-			} else if (es.getStageMember(2).getName().equals("Task 1")) {
-				egv2 = es.getStageMember(2);
-			} else if (es.getStageMember(2).getName().equals("Task 2")) {
-				egv3 = es.getStageMember(2);
-			} else if (es.getStageMember(2).getName().equals("Task 3")) {
-				egv4 = es.getStageMember(2);
-			} else {
-				egv5 = es.getStageMember(2);
-			}
-			if (es.getStageMember(3).getName().equals("Input 1")) {
-				egv0 = es.getStageMember(3);
-			} else if (es.getStageMember(3).getName().equals("Input 2")) {
-				egv1 = es.getStageMember(3);
-			} else if (es.getStageMember(3).getName().equals("Task 1")) {
-				egv2 = es.getStageMember(3);
-			} else if (es.getStageMember(3).getName().equals("Task 2")) {
-				egv3 = es.getStageMember(3);
-			} else if (es.getStageMember(3).getName().equals("Task 3")) {
-				egv4 = es.getStageMember(3);
-			} else {
-				egv5 = es.getStageMember(3);
-			}
-			if (es.getStageMember(4).getName().equals("Input 1")) {
-				egv0 = es.getStageMember(4);
-			} else if (es.getStageMember(4).getName().equals("Input 2")) {
-				egv1 = es.getStageMember(4);
-			} else if (es.getStageMember(4).getName().equals("Task 1")) {
-				egv2 = es.getStageMember(4);
-			} else if (es.getStageMember(4).getName().equals("Task 2")) {
-				egv3 = es.getStageMember(4);
-			} else if (es.getStageMember(4).getName().equals("Task 3")) {
-				egv4 = es.getStageMember(4);
-			} else {
-				egv5 = es.getStageMember(4);
-			}
-			if (es.getStageMember(5).getName().equals("Input 1")) {
-				egv0 = es.getStageMember(5);
-			} else if (es.getStageMember(5).getName().equals("Input 2")) {
-				egv1 = es.getStageMember(5);
-			} else if (es.getStageMember(5).getName().equals("Task 1")) {
-				egv2 = es.getStageMember(5);
-			} else if (es.getStageMember(5).getName().equals("Task 2")) {
-				egv3 = es.getStageMember(5);
-			} else if (es.getStageMember(5).getName().equals("Task 3")) {
-				egv4 = es.getStageMember(5);
-			} else {
-				egv5 = es.getStageMember(5);
-			}
-			final ExecutionVertex i1_0 = egv0.getGroupMember(0); // input1
-			final ExecutionVertex i1_1 = egv0.getGroupMember(1); // input1
-			final ExecutionVertex i2_0 = egv1.getGroupMember(0); // input2
-			final ExecutionVertex i2_1 = egv1.getGroupMember(1); // input2
-			final ExecutionVertex t1_0 = egv2.getGroupMember(0); // task1
-			final ExecutionVertex t1_1 = egv2.getGroupMember(1); // task1
-			final ExecutionVertex t2_0 = egv3.getGroupMember(0); // task2
-			final ExecutionVertex t2_1 = egv3.getGroupMember(1); // task2
-			final ExecutionVertex t3_0 = egv4.getGroupMember(0); // task3
-			final ExecutionVertex t3_1 = egv4.getGroupMember(1); // task3
-			final ExecutionVertex o1_0 = egv5.getGroupMember(0); // output1
-			final ExecutionVertex o1_1 = egv5.getGroupMember(1); // otuput1
-
-			// instance 1
-			assertTrue((t1_0.getAllocatedResource().equals(i1_0.getAllocatedResource()) && !t1_0.getAllocatedResource()
-				.equals(i1_1.getAllocatedResource()))
-				|| (!t1_0.getAllocatedResource().equals(i1_0.getAllocatedResource()) && t1_0.getAllocatedResource()
-					.equals(i1_1.getAllocatedResource())));
-			assertTrue((t1_0.getAllocatedResource().equals(i2_0.getAllocatedResource()) && !t1_0.getAllocatedResource()
-				.equals(i2_1.getAllocatedResource()))
-				|| (!t1_0.getAllocatedResource().equals(i2_0.getAllocatedResource()) && t1_0.getAllocatedResource()
-					.equals(i2_1.getAllocatedResource())));
-			assertTrue((t1_0.getAllocatedResource().equals(t2_0.getAllocatedResource()) && !t1_0.getAllocatedResource()
-				.equals(t2_1.getAllocatedResource()))
-				|| (!t1_0.getAllocatedResource().equals(t2_0.getAllocatedResource()) && t1_0.getAllocatedResource()
-					.equals(t2_1.getAllocatedResource())));
-			assertTrue((t1_0.getAllocatedResource().equals(t3_0.getAllocatedResource()) && !t1_0.getAllocatedResource()
-				.equals(t3_1.getAllocatedResource()))
-				|| (!t1_0.getAllocatedResource().equals(t3_0.getAllocatedResource()) && t1_0.getAllocatedResource()
-					.equals(t3_1.getAllocatedResource())));
-			assertTrue((t1_0.getAllocatedResource().equals(o1_0.getAllocatedResource()) && !t1_0.getAllocatedResource()
-				.equals(o1_1.getAllocatedResource()))
-				|| (!t1_0.getAllocatedResource().equals(o1_0.getAllocatedResource()) && t1_0.getAllocatedResource()
-					.equals(o1_1.getAllocatedResource())));
-			// instance 2
-			assertTrue((t1_1.getAllocatedResource().equals(i1_0.getAllocatedResource()) && !t1_1.getAllocatedResource()
-				.equals(i1_1.getAllocatedResource()))
-				|| (!t1_1.getAllocatedResource().equals(i1_0.getAllocatedResource()) && t1_1.getAllocatedResource()
-					.equals(i1_1.getAllocatedResource())));
-			assertTrue((t1_1.getAllocatedResource().equals(i2_0.getAllocatedResource()) && !t1_1.getAllocatedResource()
-				.equals(i2_1.getAllocatedResource()))
-				|| (!t1_1.getAllocatedResource().equals(i2_0.getAllocatedResource()) && t1_1.getAllocatedResource()
-					.equals(i2_1.getAllocatedResource())));
-			assertTrue((t1_1.getAllocatedResource().equals(t2_0.getAllocatedResource()) && !t1_1.getAllocatedResource()
-				.equals(t2_1.getAllocatedResource()))
-				|| (!t1_1.getAllocatedResource().equals(t2_0.getAllocatedResource()) && t1_1.getAllocatedResource()
-					.equals(t2_1.getAllocatedResource())));
-			assertTrue((t1_1.getAllocatedResource().equals(t3_0.getAllocatedResource()) && !t1_1.getAllocatedResource()
-				.equals(t3_1.getAllocatedResource()))
-				|| (!t1_1.getAllocatedResource().equals(t3_0.getAllocatedResource()) && t1_1.getAllocatedResource()
-					.equals(t3_1.getAllocatedResource())));
-			assertTrue((t1_1.getAllocatedResource().equals(o1_0.getAllocatedResource()) && !t1_1.getAllocatedResource()
-				.equals(o1_1.getAllocatedResource()))
-				|| (!t1_1.getAllocatedResource().equals(o1_0.getAllocatedResource()) && t1_1.getAllocatedResource()
-					.equals(o1_1.getAllocatedResource())));
-		} catch (GraphConversionException e) {
-			fail(e.getMessage());
-		} catch (IOException e) {
-			fail(e.getMessage());
-		} catch (JobGraphDefinitionException e) {
-			fail(e.getMessage());
-		} finally {
-			if (inputFile1 != null) {
-				inputFile1.delete();
-			}
-			if (inputFile2 != null) {
-				inputFile2.delete();
-			}
-			if (outputFile != null) {
-				outputFile.delete();
-			}
-			if (jobID != null) {
-				try {
-					LibraryCacheManager.unregister(jobID);
-				} catch (IOException ioe) {
-				}
-			}
-		}
-	}
-
-	/*
-	 * input1 -> task1 -> output1
-	 * -> task3 -> task4
-	 * input2 -> task2 -> output2
-	 * all subtasks defined
-	 * all instance types defined
-	 * all channel types defined
-	 */
-	@Test
-	public void testConvertJobGraphToExecutionGraph4() {
-
-		File inputFile1 = null;
-		File inputFile2 = null;
-		File outputFile1 = null;
-		File outputFile2 = null;
-		JobID jobID = null;
-
-		try {
-
-			inputFile1 = ServerTestUtils.createInputFile(0);
-			inputFile2 = ServerTestUtils.createInputFile(0);
-			outputFile1 = new File(ServerTestUtils.getRandomFilename());
-			outputFile2 = new File(ServerTestUtils.getRandomFilename());
-
-			// create job graph
-			final JobGraph jg = new JobGraph("Job Graph 1");
-			jobID = jg.getJobID();
-
-			// input vertex
-			final JobInputVertex i1 = new JobInputVertex("Input 1", jg);
-			i1.setInvokableClass(DataSourceTask.class);
-			i1.setInputFormat(new TextInputFormat(new Path(inputFile1.toURI())));
-			i1.setNumberOfSubtasks(4);
-			final JobInputVertex i2 = new JobInputVertex("Input 2", jg);
-			i2.setInvokableClass(DataSourceTask.class);
-			i2.setInputFormat(new TextInputFormat(new Path(inputFile2.toURI())));
-			i2.setNumberOfSubtasks(4);
-			// task vertex
-			final JobTaskVertex t1 = new JobTaskVertex("Task 1", jg);
-			t1.setInvokableClass(ForwardTask1Input1Output.class);
-			t1.setNumberOfSubtasks(4);
-			final JobTaskVertex t2 = new JobTaskVertex("Task 2", jg);
-			t2.setInvokableClass(ForwardTask1Input1Output.class);
-			t2.setNumberOfSubtasks(4);
-			final JobTaskVertex t3 = new JobTaskVertex("Task 3", jg);
-			t3.setInvokableClass(ForwardTask2Inputs1Output.class);
-			t3.setNumberOfSubtasks(8);
-			final JobTaskVertex t4 = new JobTaskVertex("Task 4", jg);
-			t4.setInvokableClass(ForwardTask1Input2Outputs.class);
-			t4.setNumberOfSubtasks(8);
-			// output vertex
-			final JobOutputVertex o1 = new JobOutputVertex("Output 1", jg);
-			o1.setInvokableClass(DataSinkTask.class);
-			o1.setOutputFormat(new DiscardingOuputFormat<Object>());
-			o1.setNumberOfSubtasks(4);
-			final JobOutputVertex o2 = new JobOutputVertex("Output 2", jg);
-			o2.setInvokableClass(DataSinkTask.class);
-			o2.setOutputFormat(new DiscardingOuputFormat<Object>());
-			o2.setNumberOfSubtasks(4);
-			o1.setVertexToShareInstancesWith(o2);
-
-			// connect vertices
-			i1.connectTo(t1, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
-			i2.connectTo(t2, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
-			t1.connectTo(t3, ChannelType.NETWORK);
-			t2.connectTo(t3, ChannelType.NETWORK);
-			t3.connectTo(t4, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
-			t4.connectTo(o1, ChannelType.NETWORK);
-			t4.connectTo(o2, ChannelType.NETWORK);
-
-			LibraryCacheManager.register(jobID, new String[0]);
-
-			// now convert job graph to execution graph
-			final ExecutionGraph eg = new ExecutionGraph(jg, 1);
-
-			// test instance types in ExecutionGraph
-			ExecutionStage executionStage = eg.getCurrentExecutionStage();
-			assertNotNull(executionStage);
-			assertEquals(0, executionStage.getStageNumber());
-			
-			assertEquals(20, executionStage.getRequiredSlots());
-			// Fake transition to next stage by triggering execution state changes manually
-			final Iterator<ExecutionVertex> it = new ExecutionGraphIterator(eg, eg.getIndexOfCurrentExecutionStage(),
-				true, true);
-
-			while (it.hasNext()) {
-				final ExecutionVertex ev = it.next();
-				ev.updateExecutionState(ExecutionState.SCHEDULED);
-				ev.updateExecutionState(ExecutionState.ASSIGNED);
-				ev.updateExecutionState(ExecutionState.READY);
-				ev.updateExecutionState(ExecutionState.STARTING);
-				ev.updateExecutionState(ExecutionState.RUNNING);
-				ev.updateExecutionState(ExecutionState.FINISHING);
-				ev.updateExecutionState(ExecutionState.FINISHED);
-			}
-		} catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		} finally {
-			if (inputFile1 != null) {
-				inputFile1.delete();
-			}
-			if (inputFile2 != null) {
-				inputFile2.delete();
-			}
-			if (outputFile1 != null) {
-				outputFile1.delete();
-			}
-			if (outputFile2 != null) {
-				outputFile2.delete();
-			}
-			if (jobID != null) {
-				try {
-					LibraryCacheManager.unregister(jobID);
-				} catch (IOException e) {
-				}
-			}
-		}
-	}
-
-	/**
-	 * Tests the conversion of a job graph representing a self cross to an execution graph.
-	 */
-	@Test
-	public void testConvertSelfCross() {
-
-		final String inputTaskName = "Self Cross Input";
-		final String crossTaskName = "Self Cross Task";
-		final String outputTaskName = "Self Cross Output";
-		final int degreeOfParallelism = 4;
-		File inputFile = null;
-		File outputFile = null;
-		JobID jobID = null;
-
-		try {
-			inputFile = ServerTestUtils.createInputFile(0);
-			outputFile = new File(ServerTestUtils.getRandomFilename());
-
-			// create job graph
-			final JobGraph jg = new JobGraph("Self Cross Test Job");
-			jobID = jg.getJobID();
-
-			// input vertex
-			final JobInputVertex input = new JobInputVertex(inputTaskName, jg);
-			input.setInvokableClass(DataSourceTask.class);
-			input.setInputFormat(new TextInputFormat(new Path(inputFile.toURI())));
-			input.setNumberOfSubtasks(degreeOfParallelism);
-
-			// cross vertex
-			final JobTaskVertex cross = new JobTaskVertex(crossTaskName, jg);
-			cross.setInvokableClass(SelfCrossForwardTask.class);
-			cross.setNumberOfSubtasks(degreeOfParallelism);
-
-			// output vertex
-			final JobOutputVertex output = new JobOutputVertex(outputTaskName, jg);
-			output.setInvokableClass(DataSinkTask.class);
-			output.setOutputFormat(new DiscardingOuputFormat<Object>());
-			output.setNumberOfSubtasks(degreeOfParallelism);
-
-			// connect vertices
-			input.connectTo(cross, ChannelType.IN_MEMORY, 0, 0,
-				DistributionPattern.POINTWISE);
-			input.connectTo(cross, ChannelType.NETWORK, 1, 1,
-				DistributionPattern.BIPARTITE);
-			cross.connectTo(output, ChannelType.IN_MEMORY, 0, 0,
-				DistributionPattern.POINTWISE);
-
-			LibraryCacheManager.register(jobID, new String[0]);
-
-			// now convert job graph to execution graph
-			final ExecutionGraph eg = new ExecutionGraph(jg, 1);
-
-			assertEquals(1, eg.getNumberOfStages());
-
-			final ExecutionStage stage = eg.getStage(0);
-
-			assertEquals(3, stage.getNumberOfStageMembers());
-
-			ExecutionGroupVertex inputGroupVertex = null;
-			ExecutionGroupVertex crossGroupVertex = null;
-			ExecutionGroupVertex outputGroupVertex = null;
-			final ExecutionGroupVertexIterator groupIt = new ExecutionGroupVertexIterator(eg, true, -1);
-			while (groupIt.hasNext()) {
-
-				ExecutionGroupVertex gv = groupIt.next();
-				if (inputTaskName.equals(gv.getName())) {
-					inputGroupVertex = gv;
-				} else if (crossTaskName.equals(gv.getName())) {
-					crossGroupVertex = gv;
-				} else if (outputTaskName.equals(gv.getName())) {
-					outputGroupVertex = gv;
-				}
-			}
-
-			assertNotNull(inputGroupVertex);
-			assertNotNull(crossGroupVertex);
-			assertNotNull(outputGroupVertex);
-
-			assertEquals(degreeOfParallelism, inputGroupVertex.getCurrentNumberOfGroupMembers());
-			assertEquals(degreeOfParallelism, crossGroupVertex.getCurrentNumberOfGroupMembers());
-			assertEquals(degreeOfParallelism, outputGroupVertex.getCurrentNumberOfGroupMembers());
-
-			// Check that all subtasks on a pipeline share the same instance
-			assertEquals(inputGroupVertex.getGroupMember(0).getAllocatedResource(), crossGroupVertex.getGroupMember(0)
-				.getAllocatedResource());
-			assertEquals(inputGroupVertex.getGroupMember(1).getAllocatedResource(), crossGroupVertex.getGroupMember(1)
-				.getAllocatedResource());
-			assertEquals(inputGroupVertex.getGroupMember(2).getAllocatedResource(), crossGroupVertex.getGroupMember(2)
-				.getAllocatedResource());
-			assertEquals(inputGroupVertex.getGroupMember(3).getAllocatedResource(), crossGroupVertex.getGroupMember(3)
-				.getAllocatedResource());
-
-			assertEquals(crossGroupVertex.getGroupMember(0).getAllocatedResource(), outputGroupVertex.getGroupMember(0)
-				.getAllocatedResource());
-			assertEquals(crossGroupVertex.getGroupMember(1).getAllocatedResource(), outputGroupVertex.getGroupMember(1)
-				.getAllocatedResource());
-			assertEquals(crossGroupVertex.getGroupMember(2).getAllocatedResource(), outputGroupVertex.getGroupMember(2)
-				.getAllocatedResource());
-			assertEquals(crossGroupVertex.getGroupMember(3).getAllocatedResource(), outputGroupVertex.getGroupMember(3)
-				.getAllocatedResource());
-
-			// Check that all subtasks on different pipelines run on different instances
-			assertFalse(inputGroupVertex.getGroupMember(0).getAllocatedResource()
-				.equals(inputGroupVertex.getGroupMember(1).getAllocatedResource()));
-			assertFalse(inputGroupVertex.getGroupMember(1).getAllocatedResource()
-				.equals(inputGroupVertex.getGroupMember(2).getAllocatedResource()));
-			assertFalse(inputGroupVertex.getGroupMember(2).getAllocatedResource()
-				.equals(inputGroupVertex.getGroupMember(3).getAllocatedResource()));
-
-		} catch (GraphConversionException e) {
-			fail(e.getMessage());
-		} catch (JobGraphDefinitionException e) {
-			fail(e.getMessage());
-		} catch (IOException ioe) {
-			fail(ioe.getMessage());
-		} finally {
-			if (inputFile != null) {
-				inputFile.delete();
-			}
-			if (outputFile != null) {
-				outputFile.delete();
-			}
-			if (jobID != null) {
-				try {
-					LibraryCacheManager.unregister(jobID);
-				} catch (IOException e) {
-				}
-			}
-		}
-	}
-
-	/**
-	 * This test checks the correctness of the instance sharing API. In particular, the test checks the behavior of the
-	 * instance sharing as reported broken in ticket #198
-	 */
-	@Test
-	public void testInstanceSharing() {
-
-		final int degreeOfParallelism = 4;
-		File inputFile1 = null;
-		File outputFile1 = null;
-		JobID jobID = null;
-
-		try {
-
-			inputFile1 = ServerTestUtils.createInputFile(0);
-			outputFile1 = new File(ServerTestUtils.getRandomFilename());
-
-			// create job graph
-			final JobGraph jg = new JobGraph("Instance Sharing Test Job");
-			jobID = jg.getJobID();
-
-			// input vertex
-			final JobInputVertex input1 = new JobInputVertex("Input 1", jg);
-			input1.setInvokableClass(DataSourceTask.class);
-			input1.setInputFormat(new TextInputFormat(new Path(inputFile1.toURI())));
-			input1.setNumberOfSubtasks(degreeOfParallelism);
-			
-			
-
-			// forward vertex 1
-			final JobTaskVertex forward1 = new JobTaskVertex("Forward 1", jg);
-			forward1.setInvokableClass(ForwardTask1Input1Output.class);
-			forward1.setNumberOfSubtasks(degreeOfParallelism);
-
-			// forward vertex 2
-			final JobTaskVertex forward2 = new JobTaskVertex("Forward 2", jg);
-			forward2.setInvokableClass(ForwardTask1Input1Output.class);
-			forward2.setNumberOfSubtasks(degreeOfParallelism);
-
-			// forward vertex 3
-			final JobTaskVertex forward3 = new JobTaskVertex("Forward 3", jg);
-			forward3.setInvokableClass(ForwardTask1Input1Output.class);
-			forward3.setNumberOfSubtasks(degreeOfParallelism);
-
-			// output vertex
-			final JobOutputVertex output1 = new JobOutputVertex("Output 1", jg);
-			output1.setInvokableClass(DataSinkTask.class);
-			output1.setOutputFormat(new DiscardingOuputFormat<Object>());
-			output1.setNumberOfSubtasks(degreeOfParallelism);
-
-			// connect vertices
-			input1.connectTo(forward1, ChannelType.IN_MEMORY,
-				DistributionPattern.POINTWISE);
-			forward1.connectTo(forward2, ChannelType.IN_MEMORY,
-					DistributionPattern.POINTWISE);
-			forward2.connectTo(forward3, ChannelType.NETWORK,
-					DistributionPattern.POINTWISE);
-			forward3.connectTo(output1, ChannelType.IN_MEMORY);
-
-			// setup instance sharing
-			input1.setVertexToShareInstancesWith(forward1);
-			forward1.setVertexToShareInstancesWith(forward2);
-			forward2.setVertexToShareInstancesWith(forward3);
-			forward3.setVertexToShareInstancesWith(output1);
-
-			LibraryCacheManager.register(jobID, new String[0]);
-
-			// now convert job graph to execution graph
-			final ExecutionGraph eg = new ExecutionGraph(jg, 1);
-
-			// Check number of stages
-			assertEquals(1, eg.getNumberOfStages());
-
-			// Check number of vertices in stage
-			final ExecutionStage stage = eg.getStage(0);
-			assertEquals(5, stage.getNumberOfStageMembers());
-
-			final int numberOfRequiredSlots = stage.getMaxNumberSubtasks();
-			assertEquals(degreeOfParallelism, numberOfRequiredSlots);
-
-		} catch (GraphConversionException e) {
-			fail(e.getMessage());
-		} catch (JobGraphDefinitionException e) {
-			fail(e.getMessage());
-		} catch (IOException ioe) {
-			fail(ioe.getMessage());
-		} finally {
-			if (inputFile1 != null) {
-				inputFile1.delete();
-			}
-			if (outputFile1 != null) {
-				outputFile1.delete();
-			}
-			if (jobID != null) {
-				try {
-					LibraryCacheManager.unregister(jobID);
-				} catch (IOException e) {
-				}
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
new file mode 100644
index 0000000..2207475
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+
+import java.lang.reflect.Field;
+import java.net.InetAddress;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.execution.ExecutionState2;
+import org.apache.flink.runtime.instance.AllocatedSlot;
+import org.apache.flink.runtime.instance.HardwareDescription;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.protocols.TaskOperationProtocol;
+import org.mockito.Matchers;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class ExecutionGraphTestUtils {
+
+	// --------------------------------------------------------------------------------------------
+	//  state modifications
+	// --------------------------------------------------------------------------------------------
+	
+	public static void setVertexState(ExecutionVertex2 vertex, ExecutionState2 state) {
+		try {
+			Field f = ExecutionVertex2.class.getDeclaredField("state");
+			f.setAccessible(true);
+			f.set(vertex, state);
+		}
+		catch (Exception e) {
+			throw new RuntimeException("Modifying the state failed", e);
+		}
+	}
+	
+	public static void setVertexResource(ExecutionVertex2 vertex, AllocatedSlot slot) {
+		try {
+			Field f = ExecutionVertex2.class.getDeclaredField("assignedSlot");
+			f.setAccessible(true);
+			f.set(vertex, slot);
+		}
+		catch (Exception e) {
+			throw new RuntimeException("Modifying the slot failed", e);
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//  utility mocking methods
+	// --------------------------------------------------------------------------------------------
+	
+	public static Instance getInstance(final TaskOperationProtocol top) throws Exception {
+		HardwareDescription hardwareDescription = new HardwareDescription(4, 2L*1024*1024*1024, 1024*1024*1024, 512*1024*1024);
+		InetAddress address = InetAddress.getByName("127.0.0.1");
+		InstanceConnectionInfo connection = new InstanceConnectionInfo(address, 10000, 10001);
+		
+		return new Instance(connection, new InstanceID(), hardwareDescription, 1) {
+			@Override
+			public TaskOperationProtocol getTaskManagerProxy() {
+				return top;
+			}
+		};
+	}
+	
+	public static ExecutionJobVertex getJobVertexNotExecuting(JobVertexID id) throws JobException {
+		ExecutionJobVertex ejv = getJobVertexBase(id);
+		
+		doAnswer(new Answer<Void>() {
+			@Override
+			public Void answer(InvocationOnMock invocation) {
+				return null;
+			}
+		}).when(ejv).execute(Matchers.any(Runnable.class));
+		
+		return ejv;
+	}
+	
+	public static ExecutionJobVertex getJobVertexExecutingSynchronously(JobVertexID id) throws JobException {
+		ExecutionJobVertex ejv = getJobVertexBase(id);
+		
+		doAnswer(new Answer<Void>() {
+			@Override
+			public Void answer(InvocationOnMock invocation) {
+				Runnable r = (Runnable) invocation.getArguments()[0];
+				r.run();
+				return null;
+			}
+		}).when(ejv).execute(Matchers.any(Runnable.class));
+		
+		return ejv;
+	}
+	
+	public static ExecutionJobVertex getJobVertexExecutingAsynchronously(JobVertexID id) throws JobException {
+		ExecutionJobVertex ejv = getJobVertexBase(id);
+		
+		doAnswer(new Answer<Void>() {
+			@Override
+			public Void answer(InvocationOnMock invocation) {
+				Runnable r = (Runnable) invocation.getArguments()[0];
+				new Thread(r).start();
+				return null;
+			}
+		}).when(ejv).execute(Matchers.any(Runnable.class));
+		
+		return ejv;
+	}
+	
+	public static ExecutionJobVertex getJobVertexExecutingTriggered(JobVertexID id, final ActionQueue queue) throws JobException {
+		ExecutionJobVertex ejv = getJobVertexBase(id);
+		
+		doAnswer(new Answer<Void>() {
+			@Override
+			public Void answer(InvocationOnMock invocation) {
+				
+				final Runnable action = (Runnable) invocation.getArguments()[0];
+				queue.queueAction(action);
+				return null;
+			}
+		}).when(ejv).execute(Matchers.any(Runnable.class));
+		
+		return ejv;
+	}
+	
+	private static ExecutionJobVertex getJobVertexBase(JobVertexID id) throws JobException {
+		AbstractJobVertex ajv = new AbstractJobVertex("TestVertex", id);
+		ajv.setInvokableClass(mock(AbstractInvokable.class).getClass());
+		
+		ExecutionGraph graph = new ExecutionGraph(new JobID(), "test job", new Configuration());
+		
+		return spy(new ExecutionJobVertex(graph, ajv, 1));
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	public static final class ActionQueue {
+		
+		private final LinkedBlockingQueue<Runnable> runnables = new LinkedBlockingQueue<Runnable>();
+		
+		public void triggerNextAction() {
+			Runnable r = runnables.remove();
+			r.run();
+		}
+		
+		public Runnable popNextAction() {
+			Runnable r = runnables.remove();
+			return r;
+		}
+
+		public void queueAction(Runnable r) {
+			this.runnables.add(r);
+		}
+	}
+}