You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/21 04:13:16 UTC

[52/63] [abbrv] git commit: Adjust tests to new JobGraphModel

Adjust tests to new JobGraphModel


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/caa4ebef
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/caa4ebef
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/caa4ebef

Branch: refs/heads/master
Commit: caa4ebef82d84b5b4ffb945f90c06e0e82b9a102
Parents: e56d883
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Sep 15 16:54:38 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sat Sep 20 20:02:50 2014 +0200

----------------------------------------------------------------------
 .../client/program/PackagedProgramTest.java     |   1 -
 .../runtime/jobgraph/AbstractJobVertex.java     |   6 +-
 .../runtime/jobgraph/JobManagerTestUtils.java   |  89 +++++++++
 .../jobmanager/CoLocationConstraintITCase.java  | 111 +++++++++++
 .../runtime/jobmanager/JobManagerITCase.java    | 148 +--------------
 .../runtime/jobmanager/SlotSharingITCase.java   | 186 +++++++++++++++++++
 .../tasks/AgnosticBinaryReceiver.java           |  41 ++++
 .../jobmanager/tasks/AgnosticReceiver.java      |  38 ++++
 .../runtime/jobmanager/tasks/Receiver.java      |  44 +++++
 .../flink/runtime/jobmanager/tasks/Sender.java  |  46 +++++
 .../BroadcastVarsNepheleITCase.java             |  26 +--
 .../KMeansIterativeNepheleITCase.java           |  29 +--
 .../test/cancelling/CancellingTestBase.java     |   1 -
 .../test/iterative/nephele/JobGraphUtils.java   |  48 ++---
 .../test/runtime/NetworkStackThroughput.java    |  54 +++---
 flink-tests/src/test/resources/logback-test.xml |   4 +-
 16 files changed, 655 insertions(+), 217 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/caa4ebef/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramTest.java
index 372c65b..4adfdb8 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramTest.java
@@ -31,7 +31,6 @@ public class PackagedProgramTest {
 	@Test
 	public void testGetPreviewPlan() {
 		try {
-			
 			PackagedProgram prog = new PackagedProgram(new File(CliFrontendTestUtils.getTestJarPath()));
 			Assert.assertNotNull(prog.getPreviewPlan());
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/caa4ebef/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java
index 82823b2..d2462ba 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java
@@ -62,11 +62,13 @@ public class AbstractJobVertex implements java.io.Serializable {
 	/** Optionally, a source of input splits */
 	private InputSplitSource<?> inputSplitSource;
 	
+	/** The name of the vertex */
+	private String name;
+	
 	/** Optionally, a sharing group that allows subtasks from different job vertices to run concurrently in one slot */
 	private SlotSharingGroup slotSharingGroup;
 	
-	/** The name of the vertex */
-	private String name;
+//	private AbstractJobVertex coLocatedWith
 
 	// --------------------------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/caa4ebef/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java
new file mode 100644
index 0000000..14a73e1
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobgraph;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.ExecutionMode;
+import org.apache.flink.runtime.jobmanager.JobManager;
+
+public class JobManagerTestUtils {
+
+	public static final JobManager startJobManager(int numSlots) throws Exception {
+		Configuration cfg = new Configuration();
+		cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
+		cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, getAvailablePort());
+		cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 10);
+		cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots);
+		
+		GlobalConfiguration.includeConfiguration(cfg);
+		
+		JobManager jm = new JobManager(ExecutionMode.LOCAL);
+		
+		// we need to wait until the taskmanager is registered
+		// max time is 5 seconds
+		long deadline = System.currentTimeMillis() + 5000;
+		
+		while (jm.getAvailableSlots() < numSlots && System.currentTimeMillis() < deadline) {
+			Thread.sleep(10);
+		}
+		
+		assertEquals(numSlots, jm.getAvailableSlots());
+		
+		return jm;
+	}
+	
+	public static int getAvailablePort() throws IOException {
+		for (int i = 0; i < 50; i++) {
+			ServerSocket serverSocket = null;
+			try {
+				serverSocket = new ServerSocket(0);
+				int port = serverSocket.getLocalPort();
+				if (port != 0) {
+					return port;
+				}
+			} finally {
+				serverSocket.close();
+			}
+		}
+		
+		throw new IOException("could not find free port");
+	}
+	
+	public static void waitForTaskThreadsToBeTerminated() throws InterruptedException {
+		Thread[] threads = new Thread[Thread.activeCount()];
+		Thread.enumerate(threads);
+		
+		for (Thread t : threads) {
+			if (t == null) {
+				continue;
+			}
+			ThreadGroup tg = t.getThreadGroup();
+			if (tg != null && tg.getName() != null && tg.getName().equals("Task Threads")) {
+				t.join();
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/caa4ebef/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.java
new file mode 100644
index 0000000..9bda8ed
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import static org.apache.flink.runtime.jobgraph.JobManagerTestUtils.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import org.apache.flink.runtime.client.AbstractJobResult;
+import org.apache.flink.runtime.client.JobSubmissionResult;
+import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.instance.LocalInstanceManager;
+import org.apache.flink.runtime.io.network.bufferprovider.GlobalBufferPool;
+import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.jobmanager.tasks.Receiver;
+import org.apache.flink.runtime.jobmanager.tasks.Sender;
+import org.junit.Test;
+
+public class CoLocationConstraintITCase {
+
+	
+	/**
+	 * This job runs in N slots with N senders and N receivers. Unless slot sharing is used, it cannot complete.
+	 */
+	@Test
+	public void testForwardJob() {
+		
+		final int NUM_TASKS = 31;
+		
+		try {
+			final AbstractJobVertex sender = new AbstractJobVertex("Sender");
+			final AbstractJobVertex receiver = new AbstractJobVertex("Receiver");
+			
+			sender.setInvokableClass(Sender.class);
+			receiver.setInvokableClass(Receiver.class);
+			
+			sender.setParallelism(NUM_TASKS);
+			receiver.setParallelism(NUM_TASKS);
+			
+			receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE);
+			
+			SlotSharingGroup sharingGroup = new SlotSharingGroup(sender.getID(), receiver.getID());
+			sender.setSlotSharingGroup(sharingGroup);
+			receiver.setSlotSharingGroup(sharingGroup);
+			
+			final JobGraph jobGraph = new JobGraph("Pointwise Job", sender, receiver);
+			
+			final JobManager jm = startJobManager(NUM_TASKS);
+			
+			final GlobalBufferPool bp = ((LocalInstanceManager) jm.getInstanceManager())
+					.getTaskManagers()[0].getChannelManager().getGlobalBufferPool();
+			
+			try {
+				// we need to register the job at the library cache manager (with no libraries)
+				LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
+				
+				JobSubmissionResult result = jm.submitJob(jobGraph);
+
+				if (result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) {
+					System.out.println(result.getDescription());
+				}
+				assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
+				
+				// monitor the execution
+				ExecutionGraph eg = jm.getCurrentJobs().get(jobGraph.getJobID());
+				
+				if (eg != null) {
+					eg.waitForJobEnd();
+					assertEquals(JobStatus.FINISHED, eg.getState());
+					
+					assertEquals(0, eg.getRegisteredExecutions().size());
+				}
+				else {
+					// already done, that was fast;
+				}
+				
+				// make sure that in any case, the network buffers are all returned
+				waitForTaskThreadsToBeTerminated();
+				assertEquals(bp.numBuffers(), bp.numAvailableBuffers());
+			}
+			finally {
+				jm.shutdown();
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/caa4ebef/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
index f4d74a3..44d1c11 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
@@ -18,17 +18,12 @@
 
 package org.apache.flink.runtime.jobmanager;
 
+import static org.apache.flink.runtime.jobgraph.JobManagerTestUtils.*;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-import java.io.IOException;
-import java.net.ServerSocket;
-
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.runtime.ExecutionMode;
 import org.apache.flink.runtime.client.AbstractJobResult;
 import org.apache.flink.runtime.client.JobSubmissionResult;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
@@ -42,8 +37,12 @@ import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.tasks.AgnosticBinaryReceiver;
+import org.apache.flink.runtime.jobmanager.tasks.AgnosticReceiver;
 import org.apache.flink.runtime.jobmanager.tasks.BlockingNoOpInvokable;
 import org.apache.flink.runtime.jobmanager.tasks.NoOpInvokable;
+import org.apache.flink.runtime.jobmanager.tasks.Receiver;
+import org.apache.flink.runtime.jobmanager.tasks.Sender;
 import org.apache.flink.runtime.types.IntegerRecord;
 import org.junit.Test;
 
@@ -837,144 +836,9 @@ public class JobManagerITCase {
 	}
 	
 	// --------------------------------------------------------------------------------------------
-	
-	private static final JobManager startJobManager(int numSlots) throws Exception {
-		Configuration cfg = new Configuration();
-		cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
-		cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, getAvailablePort());
-		cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 10);
-		cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots);
-		
-		GlobalConfiguration.includeConfiguration(cfg);
-		
-		JobManager jm = new JobManager(ExecutionMode.LOCAL);
-		
-		// we need to wait until the taskmanager is registered
-		// max time is 5 seconds
-		long deadline = System.currentTimeMillis() + 5000;
-		
-		while (jm.getAvailableSlots() < numSlots && System.currentTimeMillis() < deadline) {
-			Thread.sleep(10);
-		}
-		
-		assertEquals(numSlots, jm.getAvailableSlots());
-		
-		return jm;
-	}
-	
-	private static int getAvailablePort() throws IOException {
-		for (int i = 0; i < 50; i++) {
-			ServerSocket serverSocket = null;
-			try {
-				serverSocket = new ServerSocket(0);
-				int port = serverSocket.getLocalPort();
-				if (port != 0) {
-					return port;
-				}
-			} finally {
-				serverSocket.close();
-			}
-		}
-		
-		throw new IOException("could not find free port");
-	}
-	
-	private static void waitForTaskThreadsToBeTerminated() throws InterruptedException {
-		Thread[] threads = new Thread[Thread.activeCount()];
-		Thread.enumerate(threads);
-		
-		for (Thread t : threads) {
-			if (t == null) {
-				continue;
-			}
-			ThreadGroup tg = t.getThreadGroup();
-			if (tg != null && tg.getName() != null && tg.getName().equals("Task Threads")) {
-				t.join();
-			}
-		}
-	}
-	
-	// --------------------------------------------------------------------------------------------
 	//  Simple test tasks
 	// --------------------------------------------------------------------------------------------
 	
-	public static final class Sender extends AbstractInvokable {
-
-		private RecordWriter<IntegerRecord> writer;
-		
-		@Override
-		public void registerInputOutput() {
-			writer = new RecordWriter<IntegerRecord>(this);
-		}
-
-		@Override
-		public void invoke() throws Exception {
-			try {
-				writer.initializeSerializers();
-				writer.emit(new IntegerRecord(42));
-				writer.emit(new IntegerRecord(1337));
-				writer.flush();
-			}
-			finally {
-				writer.clearBuffers();
-			}
-		}
-	}
-	
-	public static final class Receiver extends AbstractInvokable {
-
-		private RecordReader<IntegerRecord> reader;
-		
-		@Override
-		public void registerInputOutput() {
-			reader = new RecordReader<IntegerRecord>(this, IntegerRecord.class);
-		}
-
-		@Override
-		public void invoke() throws Exception {
-			IntegerRecord i1 = reader.next();
-			IntegerRecord i2 = reader.next();
-			IntegerRecord i3 = reader.next();
-			
-			if (i1.getValue() != 42 || i2.getValue() != 1337 || i3 != null) {
-				throw new Exception("Wrong Data Received");
-			}
-		}
-	}
-	
-	public static final class AgnosticReceiver extends AbstractInvokable {
-
-		private RecordReader<IntegerRecord> reader;
-		
-		@Override
-		public void registerInputOutput() {
-			reader = new RecordReader<IntegerRecord>(this, IntegerRecord.class);
-		}
-
-		@Override
-		public void invoke() throws Exception {
-			while (reader.next() != null);
-		}
-	}
-	
-	public static final class AgnosticBinaryReceiver extends AbstractInvokable {
-
-		private RecordReader<IntegerRecord> reader1;
-		private RecordReader<IntegerRecord> reader2;
-		
-		@Override
-		public void registerInputOutput() {
-			reader1 = new RecordReader<IntegerRecord>(this, IntegerRecord.class);
-			reader2 = new RecordReader<IntegerRecord>(this, IntegerRecord.class);
-		}
-
-		@Override
-		public void invoke() throws Exception {
-			while (reader1.next() != null);
-			while (reader2.next() != null);
-		}
-	}
-	
 	public static final class ExceptionSender extends AbstractInvokable {
 
 		private RecordWriter<IntegerRecord> writer;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/caa4ebef/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotSharingITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotSharingITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotSharingITCase.java
new file mode 100644
index 0000000..98abc8d
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotSharingITCase.java
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import static org.apache.flink.runtime.jobgraph.JobManagerTestUtils.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import org.apache.flink.runtime.client.AbstractJobResult;
+import org.apache.flink.runtime.client.JobSubmissionResult;
+import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.instance.LocalInstanceManager;
+import org.apache.flink.runtime.io.network.bufferprovider.GlobalBufferPool;
+import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.jobmanager.tasks.AgnosticBinaryReceiver;
+import org.apache.flink.runtime.jobmanager.tasks.Receiver;
+import org.apache.flink.runtime.jobmanager.tasks.Sender;
+import org.junit.Test;
+
+public class SlotSharingITCase {
+
+	
+	/**
+	 * This job runs in N slots with N senders and N receivers. Unless slot sharing is used, it cannot complete.
+	 */
+	@Test
+	public void testForwardJob() {
+		
+		final int NUM_TASKS = 31;
+		
+		try {
+			final AbstractJobVertex sender = new AbstractJobVertex("Sender");
+			final AbstractJobVertex receiver = new AbstractJobVertex("Receiver");
+			
+			sender.setInvokableClass(Sender.class);
+			receiver.setInvokableClass(Receiver.class);
+			
+			sender.setParallelism(NUM_TASKS);
+			receiver.setParallelism(NUM_TASKS);
+			
+			receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE);
+			
+			SlotSharingGroup sharingGroup = new SlotSharingGroup(sender.getID(), receiver.getID());
+			sender.setSlotSharingGroup(sharingGroup);
+			receiver.setSlotSharingGroup(sharingGroup);
+			
+			final JobGraph jobGraph = new JobGraph("Pointwise Job", sender, receiver);
+			
+			final JobManager jm = startJobManager(NUM_TASKS);
+			
+			final GlobalBufferPool bp = ((LocalInstanceManager) jm.getInstanceManager())
+					.getTaskManagers()[0].getChannelManager().getGlobalBufferPool();
+			
+			try {
+				// we need to register the job at the library cache manager (with no libraries)
+				LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
+				
+				JobSubmissionResult result = jm.submitJob(jobGraph);
+
+				if (result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) {
+					System.out.println(result.getDescription());
+				}
+				assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
+				
+				// monitor the execution
+				ExecutionGraph eg = jm.getCurrentJobs().get(jobGraph.getJobID());
+				
+				if (eg != null) {
+					eg.waitForJobEnd();
+					assertEquals(JobStatus.FINISHED, eg.getState());
+					
+					assertEquals(0, eg.getRegisteredExecutions().size());
+				}
+				else {
+					// already done, that was fast;
+				}
+				
+				// make sure that in any case, the network buffers are all returned
+				waitForTaskThreadsToBeTerminated();
+				assertEquals(bp.numBuffers(), bp.numAvailableBuffers());
+			}
+			finally {
+				jm.shutdown();
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	/**
+	 * This job runs in N slots with 2 * N senders and N receivers. Unless slot sharing is used, it cannot complete.
+	 */
+	@Test
+	public void testTwoInputJob() {
+		
+		final int NUM_TASKS = 11;
+		
+		try {
+			final AbstractJobVertex sender1 = new AbstractJobVertex("Sender1");
+			final AbstractJobVertex sender2 = new AbstractJobVertex("Sender2");
+			final AbstractJobVertex receiver = new AbstractJobVertex("Receiver");
+			
+			sender1.setInvokableClass(Sender.class);
+			sender2.setInvokableClass(Sender.class);
+			receiver.setInvokableClass(AgnosticBinaryReceiver.class);
+			
+			sender1.setParallelism(NUM_TASKS);
+			sender2.setParallelism(NUM_TASKS);
+			receiver.setParallelism(NUM_TASKS);
+			
+			SlotSharingGroup sharingGroup = new SlotSharingGroup(sender1.getID(), sender2.getID(), receiver.getID());
+			sender1.setSlotSharingGroup(sharingGroup);
+			sender2.setSlotSharingGroup(sharingGroup);
+			receiver.setSlotSharingGroup(sharingGroup);;
+			
+			receiver.connectNewDataSetAsInput(sender1, DistributionPattern.POINTWISE);
+			receiver.connectNewDataSetAsInput(sender2, DistributionPattern.BIPARTITE);
+			
+			final JobGraph jobGraph = new JobGraph("Bipartite Job", sender1, receiver, sender2);
+			
+			JobManager jm = startJobManager(NUM_TASKS);
+			
+			final GlobalBufferPool bp = ((LocalInstanceManager) jm.getInstanceManager())
+								.getTaskManagers()[0].getChannelManager().getGlobalBufferPool();
+			
+			try {
+				// we need to register the job at the library cache manager (with no libraries)
+				LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
+				
+				JobSubmissionResult result = jm.submitJob(jobGraph);
+
+				if (result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) {
+					System.out.println(result.getDescription());
+				}
+				assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
+				
+				// monitor the execution
+				ExecutionGraph eg = jm.getCurrentJobs().get(jobGraph.getJobID());
+				
+				if (eg != null) {
+					eg.waitForJobEnd();
+					assertEquals(JobStatus.FINISHED, eg.getState());
+					
+					assertEquals(0, eg.getRegisteredExecutions().size());
+				}
+				else {
+					// already done, that was fast;
+				}
+				
+				// make sure that in any case, the network buffers are all returned
+				waitForTaskThreadsToBeTerminated();
+				assertEquals(bp.numBuffers(), bp.numAvailableBuffers());
+			}
+			finally {
+				jm.shutdown();
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/caa4ebef/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/AgnosticBinaryReceiver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/AgnosticBinaryReceiver.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/AgnosticBinaryReceiver.java
new file mode 100644
index 0000000..3784205
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/AgnosticBinaryReceiver.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.tasks;
+
+import org.apache.flink.runtime.io.network.api.RecordReader;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.types.IntegerRecord;
+
+public final class AgnosticBinaryReceiver extends AbstractInvokable {
+
+	private RecordReader<IntegerRecord> reader1;
+	private RecordReader<IntegerRecord> reader2;
+	
+	@Override
+	public void registerInputOutput() {
+		reader1 = new RecordReader<IntegerRecord>(this, IntegerRecord.class);
+		reader2 = new RecordReader<IntegerRecord>(this, IntegerRecord.class);
+	}
+
+	@Override
+	public void invoke() throws Exception {
+		while (reader1.next() != null);
+		while (reader2.next() != null);
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/caa4ebef/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/AgnosticReceiver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/AgnosticReceiver.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/AgnosticReceiver.java
new file mode 100644
index 0000000..ce38b46
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/AgnosticReceiver.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.tasks;
+
+import org.apache.flink.runtime.io.network.api.RecordReader;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.types.IntegerRecord;
+
+public final class AgnosticReceiver extends AbstractInvokable {
+
+	private RecordReader<IntegerRecord> reader;
+	
+	@Override
+	public void registerInputOutput() {
+		reader = new RecordReader<IntegerRecord>(this, IntegerRecord.class);
+	}
+
+	@Override
+	public void invoke() throws Exception {
+		while (reader.next() != null);
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/caa4ebef/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/Receiver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/Receiver.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/Receiver.java
new file mode 100644
index 0000000..298673a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/Receiver.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.tasks;
+
+import org.apache.flink.runtime.io.network.api.RecordReader;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.types.IntegerRecord;
+
+public final class Receiver extends AbstractInvokable {
+
+	private RecordReader<IntegerRecord> reader;
+	
+	@Override
+	public void registerInputOutput() {
+		reader = new RecordReader<IntegerRecord>(this, IntegerRecord.class);
+	}
+
+	@Override
+	public void invoke() throws Exception {
+		IntegerRecord i1 = reader.next();
+		IntegerRecord i2 = reader.next();
+		IntegerRecord i3 = reader.next();
+		
+		if (i1.getValue() != 42 || i2.getValue() != 1337 || i3 != null) {
+			throw new Exception("Wrong Data Received");
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/caa4ebef/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/Sender.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/Sender.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/Sender.java
new file mode 100644
index 0000000..340465b
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/Sender.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.tasks;
+
+import org.apache.flink.runtime.io.network.api.RecordWriter;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.types.IntegerRecord;
+
+public final class Sender extends AbstractInvokable {
+
+	private RecordWriter<IntegerRecord> writer;
+	
+	@Override
+	public void registerInputOutput() {
+		writer = new RecordWriter<IntegerRecord>(this);
+	}
+
+	@Override
+	public void invoke() throws Exception {
+		try {
+			writer.initializeSerializers();
+			writer.emit(new IntegerRecord(42));
+			writer.emit(new IntegerRecord(1337));
+			writer.flush();
+		}
+		finally {
+			writer.clearBuffers();
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/caa4ebef/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarsNepheleITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarsNepheleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarsNepheleITCase.java
index 33112af..947a448 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarsNepheleITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarsNepheleITCase.java
@@ -34,12 +34,12 @@ import org.apache.flink.api.java.typeutils.runtime.record.RecordSerializerFactor
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.io.network.channels.ChannelType;
+import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
-import org.apache.flink.runtime.jobgraph.InputFormatInputVertex;
+import org.apache.flink.runtime.jobgraph.InputFormatVertex;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobGraphDefinitionException;
-import org.apache.flink.runtime.jobgraph.JobTaskVertex;
-import org.apache.flink.runtime.jobgraph.OutputFormatOutputVertex;
+import org.apache.flink.runtime.jobgraph.OutputFormatVertex;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.operators.CollectorMapDriver;
 import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.runtime.operators.RegularPactTask;
@@ -254,8 +254,8 @@ public class BroadcastVarsNepheleITCase extends RecordAPITestBase {
 		return modelsInput;
 	}
 
-	private static JobTaskVertex createMapper(JobGraph jobGraph, int numSubTasks, TypeSerializerFactory<?> serializer) {
-		JobTaskVertex pointsInput = JobGraphUtils.createTask(RegularPactTask.class, "Map[DotProducts]", jobGraph, numSubTasks);
+	private static AbstractJobVertex createMapper(JobGraph jobGraph, int numSubTasks, TypeSerializerFactory<?> serializer) {
+		AbstractJobVertex pointsInput = JobGraphUtils.createTask(RegularPactTask.class, "Map[DotProducts]", jobGraph, numSubTasks);
 
 		{
 			TaskConfig taskConfig = new TaskConfig(pointsInput.getConfiguration());
@@ -300,7 +300,7 @@ public class BroadcastVarsNepheleITCase extends RecordAPITestBase {
 	// Unified solution set and workset tail update
 	// -------------------------------------------------------------------------------------------------------------
 
-	private JobGraph createJobGraphV1(String pointsPath, String centersPath, String resultPath, int numSubTasks) throws JobGraphDefinitionException {
+	private JobGraph createJobGraphV1(String pointsPath, String centersPath, String resultPath, int numSubTasks) {
 
 		// -- init -------------------------------------------------------------------------------------------------
 		final TypeSerializerFactory<?> serializer = RecordSerializerFactory.get();
@@ -310,7 +310,7 @@ public class BroadcastVarsNepheleITCase extends RecordAPITestBase {
 		// -- vertices ---------------------------------------------------------------------------------------------
 		InputFormatVertex points = createPointsInput(jobGraph, pointsPath, numSubTasks, serializer);
 		InputFormatVertex models = createModelsInput(jobGraph, centersPath, numSubTasks, serializer);
-		JobTaskVertex mapper = createMapper(jobGraph, numSubTasks, serializer);
+		AbstractJobVertex mapper = createMapper(jobGraph, numSubTasks, serializer);
 		OutputFormatVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer);
 
 		// -- edges ------------------------------------------------------------------------------------------------
@@ -319,9 +319,13 @@ public class BroadcastVarsNepheleITCase extends RecordAPITestBase {
 		JobGraphUtils.connect(mapper, output, ChannelType.NETWORK, DistributionPattern.POINTWISE);
 
 		// -- instance sharing -------------------------------------------------------------------------------------
-		points.setVertexToShareInstancesWith(output);
-		models.setVertexToShareInstancesWith(output);
-		mapper.setVertexToShareInstancesWith(output);
+		
+		SlotSharingGroup sharing = new SlotSharingGroup();
+		
+		points.setSlotSharingGroup(sharing);
+		models.setSlotSharingGroup(sharing);
+		mapper.setSlotSharingGroup(sharing);
+		output.setSlotSharingGroup(sharing);
 
 		return jobGraph;
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/caa4ebef/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java
index 678a7e5..a31539f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java
@@ -31,10 +31,11 @@ import org.apache.flink.runtime.io.network.channels.ChannelType;
 import org.apache.flink.runtime.iterative.task.IterationHeadPactTask;
 import org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask;
 import org.apache.flink.runtime.iterative.task.IterationTailPactTask;
+import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.InputFormatVertex;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobGraphDefinitionException;
+import org.apache.flink.runtime.jobgraph.OutputFormatVertex;
 import org.apache.flink.runtime.operators.CollectorMapDriver;
 import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.runtime.operators.GroupReduceDriver;
@@ -154,8 +155,8 @@ public class KMeansIterativeNepheleITCase extends RecordAPITestBase {
 		return output;
 	}
 	
-	private static JobTaskVertex createIterationHead(JobGraph jobGraph, int numSubTasks, TypeSerializerFactory<?> serializer) {
-		JobTaskVertex head = JobGraphUtils.createTask(IterationHeadPactTask.class, "Iteration Head", jobGraph, numSubTasks);
+	private static AbstractJobVertex createIterationHead(JobGraph jobGraph, int numSubTasks, TypeSerializerFactory<?> serializer) {
+		AbstractJobVertex head = JobGraphUtils.createTask(IterationHeadPactTask.class, "Iteration Head", jobGraph, numSubTasks);
 
 		TaskConfig headConfig = new TaskConfig(head.getConfiguration());
 		headConfig.setIterationId(ITERATION_ID);
@@ -188,11 +189,11 @@ public class KMeansIterativeNepheleITCase extends RecordAPITestBase {
 		return head;
 	}
 	
-	private static JobTaskVertex createMapper(JobGraph jobGraph, int numSubTasks, TypeSerializerFactory<?> inputSerializer,
+	private static AbstractJobVertex createMapper(JobGraph jobGraph, int numSubTasks, TypeSerializerFactory<?> inputSerializer,
 			TypeSerializerFactory<?> broadcastVarSerializer, TypeSerializerFactory<?> outputSerializer,
 			TypeComparatorFactory<?> outputComparator)
 	{
-		JobTaskVertex mapper = JobGraphUtils.createTask(IterationIntermediatePactTask.class,
+		AbstractJobVertex mapper = JobGraphUtils.createTask(IterationIntermediatePactTask.class,
 			"Map (Select nearest center)", jobGraph, numSubTasks);
 		
 		TaskConfig intermediateConfig = new TaskConfig(mapper.getConfiguration());
@@ -217,12 +218,12 @@ public class KMeansIterativeNepheleITCase extends RecordAPITestBase {
 		return mapper;
 	}
 	
-	private static JobTaskVertex createReducer(JobGraph jobGraph, int numSubTasks, TypeSerializerFactory<?> inputSerializer,
+	private static AbstractJobVertex createReducer(JobGraph jobGraph, int numSubTasks, TypeSerializerFactory<?> inputSerializer,
 			TypeComparatorFactory<?> inputComparator, TypeSerializerFactory<?> outputSerializer)
 	{
 		// ---------------- the tail (co group) --------------------
 		
-		JobTaskVertex tail = JobGraphUtils.createTask(IterationTailPactTask.class, "Reduce / Iteration Tail", jobGraph,
+		AbstractJobVertex tail = JobGraphUtils.createTask(IterationTailPactTask.class, "Reduce / Iteration Tail", jobGraph,
 			numSubTasks);
 		
 		TaskConfig tailConfig = new TaskConfig(tail.getConfiguration());
@@ -252,8 +253,8 @@ public class KMeansIterativeNepheleITCase extends RecordAPITestBase {
 		return tail;
 	}
 	
-	private static OutputFormatVertex createSync(JobGraph jobGraph, int numIterations, int dop) {
-		OutputFormatVertex sync = JobGraphUtils.createSync(jobGraph, dop);
+	private static AbstractJobVertex createSync(JobGraph jobGraph, int numIterations, int dop) {
+		AbstractJobVertex sync = JobGraphUtils.createSync(jobGraph, dop);
 		TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
 		syncConfig.setNumberOfIterations(numIterations);
 		syncConfig.setIterationId(ITERATION_ID);
@@ -264,7 +265,7 @@ public class KMeansIterativeNepheleITCase extends RecordAPITestBase {
 	// Unified solution set and workset tail update
 	// -------------------------------------------------------------------------------------------------------------
 
-	private static JobGraph createJobGraph(String pointsPath, String centersPath, String resultPath, int numSubTasks, int numIterations) throws JobGraphDefinitionException {
+	private static JobGraph createJobGraph(String pointsPath, String centersPath, String resultPath, int numSubTasks, int numIterations) {
 
 		// -- init -------------------------------------------------------------------------------------------------
 		final TypeSerializerFactory<?> serializer = RecordSerializerFactory.get();
@@ -277,14 +278,14 @@ public class KMeansIterativeNepheleITCase extends RecordAPITestBase {
 		InputFormatVertex points = createPointsInput(jobGraph, pointsPath, numSubTasks, serializer);
 		InputFormatVertex centers = createCentersInput(jobGraph, centersPath, numSubTasks, serializer);
 		
-		JobTaskVertex head = createIterationHead(jobGraph, numSubTasks, serializer);
-		JobTaskVertex mapper = createMapper(jobGraph, numSubTasks, serializer, serializer, serializer, int0Comparator);
+		AbstractJobVertex head = createIterationHead(jobGraph, numSubTasks, serializer);
+		AbstractJobVertex mapper = createMapper(jobGraph, numSubTasks, serializer, serializer, serializer, int0Comparator);
 		
-		JobTaskVertex reducer = createReducer(jobGraph, numSubTasks, serializer, int0Comparator, serializer);
+		AbstractJobVertex reducer = createReducer(jobGraph, numSubTasks, serializer, int0Comparator, serializer);
 		
 		OutputFormatVertex fakeTailOutput = JobGraphUtils.createFakeOutput(jobGraph, "FakeTailOutput", numSubTasks);
 		
-		OutputFormatVertex sync = createSync(jobGraph, numIterations, numSubTasks);
+		AbstractJobVertex sync = createSync(jobGraph, numIterations, numSubTasks);
 		
 		OutputFormatVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer);
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/caa4ebef/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
index c512525..8bf74c0 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
@@ -196,7 +196,6 @@ public abstract class CancellingTestBase {
 						case CANCELED:
 							exitLoop = true;
 							break;
-						case SCHEDULED: // okay
 						case RUNNING:
 							break;
 						default:

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/caa4ebef/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/JobGraphUtils.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/JobGraphUtils.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/JobGraphUtils.java
index 82bd046..2b4b779 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/JobGraphUtils.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/JobGraphUtils.java
@@ -34,7 +34,6 @@ import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.InputFormatVertex;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobGraphDefinitionException;
 import org.apache.flink.runtime.jobgraph.OutputFormatVertex;
 import org.apache.flink.runtime.operators.DataSinkTask;
 import org.apache.flink.runtime.operators.DataSourceTask;
@@ -45,8 +44,8 @@ public class JobGraphUtils {
 
 	public static final long MEGABYTE = 1024l * 1024l;
 
-	private JobGraphUtils() {
-	}
+	private JobGraphUtils() {}
+	
 
 	public static void submit(JobGraph graph, Configuration nepheleConfig) throws IOException, JobExecutionException {
 		JobClient client = new JobClient(graph, nepheleConfig, JobGraphUtils.class.getClassLoader());
@@ -63,10 +62,10 @@ public class JobGraphUtils {
 	private static <T extends InputFormat<?,?>> InputFormatVertex createInput(UserCodeWrapper<T> stub, String name, JobGraph graph,
 			int degreeOfParallelism)
 	{
-		InputFormatVertex inputVertex = new InputFormatVertex(graph, name);
+		InputFormatVertex inputVertex = new InputFormatVertex(name);
+		graph.addVertex(inputVertex);
 		
 		inputVertex.setInvokableClass(DataSourceTask.class);
-		
 		inputVertex.setParallelism(degreeOfParallelism);
 
 		TaskConfig inputConfig = new TaskConfig(inputVertex.getConfiguration());
@@ -83,42 +82,49 @@ public class JobGraphUtils {
 //	}
 	
 	public static void connect(AbstractJobVertex source, AbstractJobVertex target, ChannelType channelType,
-			DistributionPattern distributionPattern) throws JobGraphDefinitionException
+			DistributionPattern distributionPattern)
 	{
-		source.connectTo(target, channelType, distributionPattern);
+		target.connectNewDataSetAsInput(source, distributionPattern);
 	}
 
-	public static JobTaskVertex createTask(@SuppressWarnings("rawtypes") Class<? extends RegularPactTask> task, String name, JobGraph graph,
-			int degreeOfParallelism)
+	@SuppressWarnings("rawtypes") 
+	public static AbstractJobVertex createTask(Class<? extends RegularPactTask> task, String name, JobGraph graph, int parallelism)
 	{
-		JobTaskVertex taskVertex = new JobTaskVertex(name, graph);
+		AbstractJobVertex taskVertex = new AbstractJobVertex(name);
+		graph.addVertex(taskVertex);
+		
 		taskVertex.setInvokableClass(task);
-		taskVertex.setNumberOfSubtasks(degreeOfParallelism);
+		taskVertex.setParallelism(parallelism);
 		return taskVertex;
 	}
 
-	public static OutputFormatVertex createSync(JobGraph jobGraph, int degreeOfParallelism) {
-		OutputFormatVertex sync = new OutputFormatVertex(jobGraph, "BulkIterationSync");
+	public static AbstractJobVertex createSync(JobGraph jobGraph, int parallelism) {
+		AbstractJobVertex sync = new AbstractJobVertex("BulkIterationSync");
+		jobGraph.addVertex(sync);
+		
 		sync.setInvokableClass(IterationSynchronizationSinkTask.class);
 		sync.setParallelism(1);
+		
 		TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
-		syncConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, degreeOfParallelism);
+		syncConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, parallelism);
 		return sync;
 	}
 
-	public static OutputFormatVertex createFakeOutput(JobGraph jobGraph, String name, int degreeOfParallelism)
-	{
-		OutputFormatVertex outputVertex = new OutputFormatVertex(jobGraph, name);
+	public static OutputFormatVertex createFakeOutput(JobGraph jobGraph, String name, int degreeOfParallelism) {
+		OutputFormatVertex outputVertex = new OutputFormatVertex(name);
+		jobGraph.addVertex(outputVertex);
+		
 		outputVertex.setInvokableClass(FakeOutputTask.class);
 		outputVertex.setParallelism(degreeOfParallelism);
 		return outputVertex;
 	}
 
-	public static OutputFormatVertex createFileOutput(JobGraph jobGraph, String name, int degreeOfParallelism)
-	{
-		OutputFormatVertex sinkVertex = new OutputFormatVertex(jobGraph, name);
+	public static OutputFormatVertex createFileOutput(JobGraph jobGraph, String name, int parallelism) {
+		OutputFormatVertex sinkVertex = new OutputFormatVertex(name);
+		jobGraph.addVertex(sinkVertex);
+		
 		sinkVertex.setInvokableClass(DataSinkTask.class);
-		sinkVertex.setParallelism(degreeOfParallelism);
+		sinkVertex.setParallelism(parallelism);
 		return sinkVertex;
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/caa4ebef/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughput.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughput.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughput.java
index c365378..7c79e35 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughput.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughput.java
@@ -28,14 +28,11 @@ import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.runtime.io.network.api.RecordReader;
 import org.apache.flink.runtime.io.network.api.RecordWriter;
-import org.apache.flink.runtime.io.network.channels.ChannelType;
+import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobGraphDefinitionException;
-import org.apache.flink.runtime.jobgraph.JobTaskVertex;
-import org.apache.flink.runtime.jobgraph.SimpleInputVertex;
-import org.apache.flink.runtime.jobgraph.SimpleOutputVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.test.util.RecordAPITestBase;
 import org.junit.After;
 
@@ -95,38 +92,44 @@ public class NetworkStackThroughput {
 		}
 
 		private JobGraph createJobGraph(int dataVolumeGb, boolean useForwarder, boolean isSlowSender,
-				boolean isSlowReceiver, int numSubtasks) throws JobGraphDefinitionException {
-
+				boolean isSlowReceiver, int numSubtasks)
+		{
 			JobGraph jobGraph = new JobGraph("Speed Test");
+			SlotSharingGroup sharingGroup = new SlotSharingGroup();
 
-			SimpleInputVertex producer = new SimpleInputVertex("Speed Test Producer", jobGraph);
+			AbstractJobVertex producer = new AbstractJobVertex("Speed Test Producer");
+			jobGraph.addVertex(producer);
+			producer.setSlotSharingGroup(sharingGroup);
+			
 			producer.setInvokableClass(SpeedTestProducer.class);
-			producer.setNumberOfSubtasks(numSubtasks);
+			producer.setParallelism(numSubtasks);
 			producer.getConfiguration().setInteger(DATA_VOLUME_GB_CONFIG_KEY, dataVolumeGb);
 			producer.getConfiguration().setBoolean(IS_SLOW_SENDER_CONFIG_KEY, isSlowSender);
 
-			JobTaskVertex forwarder = null;
+			AbstractJobVertex forwarder = null;
 			if (useForwarder) {
-				forwarder = new JobTaskVertex("Speed Test Forwarder", jobGraph);
+				forwarder = new AbstractJobVertex("Speed Test Forwarder");
+				jobGraph.addVertex(forwarder);
+				forwarder.setSlotSharingGroup(sharingGroup);
+				
 				forwarder.setInvokableClass(SpeedTestForwarder.class);
-				forwarder.setNumberOfSubtasks(numSubtasks);
+				forwarder.setParallelism(numSubtasks);
 			}
 
-			SimpleOutputVertex consumer = new SimpleOutputVertex("Speed Test Consumer", jobGraph);
+			AbstractJobVertex consumer = new AbstractJobVertex("Speed Test Consumer");
+			jobGraph.addVertex(consumer);
+			consumer.setSlotSharingGroup(sharingGroup);
+			
 			consumer.setInvokableClass(SpeedTestConsumer.class);
-			consumer.setNumberOfSubtasks(numSubtasks);
+			consumer.setParallelism(numSubtasks);
 			consumer.getConfiguration().setBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, isSlowReceiver);
 
 			if (useForwarder) {
-				producer.connectTo(forwarder, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
-				forwarder.connectTo(consumer, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
-
-				forwarder.setVertexToShareInstancesWith(producer);
-				consumer.setVertexToShareInstancesWith(producer);
+				forwarder.connectNewDataSetAsInput(producer, DistributionPattern.BIPARTITE);
+				consumer.connectNewDataSetAsInput(forwarder, DistributionPattern.BIPARTITE);
 			}
 			else {
-				producer.connectTo(consumer, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
-				producer.setVertexToShareInstancesWith(consumer);
+				consumer.connectNewDataSetAsInput(producer, DistributionPattern.BIPARTITE);
 			}
 
 			return jobGraph;
@@ -285,9 +288,12 @@ public class NetworkStackThroughput {
 			TestBaseWrapper test = new TestBaseWrapper(config);
 
 			test.startCluster();
-			test.testJob();
-			test.calculateThroughput();
-			test.stopCluster();
+			try {
+				test.testJob();
+				test.calculateThroughput();
+			} finally {
+				test.stopCluster();
+			}
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/caa4ebef/flink-tests/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/logback-test.xml b/flink-tests/src/test/resources/logback-test.xml
index 7c47e0b..ec37329 100644
--- a/flink-tests/src/test/resources/logback-test.xml
+++ b/flink-tests/src/test/resources/logback-test.xml
@@ -23,12 +23,14 @@
         </encoder>
     </appender>
 
-    <root level="WARN">
+    <root level="INFO">
         <appender-ref ref="STDOUT"/>
     </root>
 
+<!-- 
     <logger name="org.apache.flink.test.recordJobs.relational.query1Util.LineItemFilter" level="ERROR"/>
     <logger name="org.apache.flink.runtime.operators.RegularPactTask" level="OFF"/>
     <logger name="org.apache.flink.runtime.taskmanager.Task" level="OFF"/>
     <logger name="org.apache.flink.runtime.jobmanager.JobManager" level="OFF"/>
+    -->
 </configuration>
\ No newline at end of file