You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/21 04:13:08 UTC

[44/63] [abbrv] git commit: More graceful failing/errors/logging when canceling in early job stages

More graceful failing/errors/logging when canceling in early job stages


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/ae57c7c0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/ae57c7c0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/ae57c7c0

Branch: refs/heads/master
Commit: ae57c7c03dafcbbf728947ee453d29bdf42ee6bc
Parents: 9187175
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Sep 15 02:43:18 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sat Sep 20 20:02:49 2014 +0200

----------------------------------------------------------------------
 .../runtime/execution/RuntimeEnvironment.java   |  64 ++---
 .../runtime/executiongraph/ExecutionGraph.java  |   8 +-
 .../runtime/executiongraph/ExecutionVertex.java |  10 +-
 .../flink/runtime/jobmanager/JobManager.java    |  20 +-
 .../flink/runtime/taskmanager/TaskManager.java  |   5 +-
 .../jobmanager/ExceptionOutputFormat.java       |  55 ----
 .../flink/runtime/jobmanager/ExceptionTask.java |  70 -----
 .../runtime/jobmanager/JobManagerITCase.java    | 271 +++++++++++++++++--
 .../jobmanager/tasks/BlockingNoOpInvokable.java |  38 +++
 .../src/test/resources/logback-test.xml         |   1 +
 flink-runtime/src/test/resources/topology.txt   |  16 --
 11 files changed, 351 insertions(+), 207 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae57c7c0/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
index 79a4aaa..ade878f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
@@ -55,7 +55,6 @@ import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
 import org.apache.flink.runtime.protocols.AccumulatorProtocol;
 import org.apache.flink.runtime.taskmanager.Task;
-import org.apache.flink.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -67,9 +66,13 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 	/** The log object used for debugging. */
 	private static final Logger LOG = LoggerFactory.getLogger(RuntimeEnvironment.class);
 
+	private static final ThreadGroup TASK_THREADS = new ThreadGroup("Task Threads");
+	
 	/** The interval to sleep in case a communication channel is not yet entirely set up (in milliseconds). */
 	private static final int SLEEPINTERVAL = 100;
 	
+	
+	
 	// --------------------------------------------------------------------------------------------
 
 	/** The task that owns this environment */
@@ -235,33 +238,7 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 			if (this.owner.isCanceled()) {
 				throw new CancelTaskException();
 			}
-		}
-		catch (Throwable t) {
 			
-			if (!this.owner.isCanceled()) {
-
-				// Perform clean up when the task failed and has been not canceled by the user
-				try {
-					this.invokable.cancel();
-				} catch (Throwable t2) {
-					LOG.error(StringUtils.stringifyException(t2));
-				}
-			}
-
-			// Release all resources that may currently be allocated by the individual channels
-			releaseAllChannelResources();
-
-			if (this.owner.isCanceled() || t instanceof CancelTaskException) {
-				this.owner.cancelingDone();
-			}
-			else {
-				this.owner.markFailed(t);
-			}
-
-			return;
-		}
-		
-		try {
 			// If there is any unclosed input gate, close it and propagate close operation to corresponding output gate
 			closeInputGates();
 
@@ -273,9 +250,28 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 
 			// Now we wait until all output channels have written out their data and are closed
 			waitForOutputChannelsToBeClosed();
+			
+			if (this.owner.isCanceled()) {
+				throw new CancelTaskException();
+			}
+			
+			// Finally, switch execution state to FINISHED and report to job manager
+			if (!owner.markAsFinished()) {
+				throw new Exception("Could notify job manager that the task is finished.");
+			}
 		}
 		catch (Throwable t) {
 			
+			if (!this.owner.isCanceled()) {
+
+				// Perform clean up when the task failed and has been not canceled by the user
+				try {
+					this.invokable.cancel();
+				} catch (Throwable t2) {
+					LOG.error("Error while canceling the task", t2);
+				}
+			}
+
 			// Release all resources that may currently be allocated by the individual channels
 			releaseAllChannelResources();
 
@@ -285,16 +281,10 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 			else {
 				this.owner.markFailed(t);
 			}
-
-			return;
 		}
-
-		// Release all resources that may currently be allocated by the individual channels
-		releaseAllChannelResources();
-
-		// Finally, switch execution state to FINISHED and report to job manager
-		if (!owner.markAsFinished()) {
-			owner.markFailed(new Exception());
+		finally {
+			// Release all resources that may currently be allocated by the individual channels
+			releaseAllChannelResources();
 		}
 	}
 
@@ -373,7 +363,7 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 
 			if (this.executingThread == null) {
 				String name = owner.getTaskNameWithSubtasks();
-				this.executingThread = new Thread(this, name);
+				this.executingThread = new Thread(TASK_THREADS, this, name);
 			}
 
 			return this.executingThread;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae57c7c0/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 3dab13e..d916f74 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -334,14 +334,18 @@ public class ExecutionGraph {
 		}
 	}
 	
-	public void waitForJobEnd() throws InterruptedException {
+	public void waitForJobEnd(long timeout) throws InterruptedException {
 		synchronized (progressLock) {
 			while (nextVertexToFinish < verticesInCreationOrder.size()) {
-				progressLock.wait();
+				progressLock.wait(timeout);
 			}
 		}
 	}
 	
+	public void waitForJobEnd() throws InterruptedException {
+		waitForJobEnd(0);
+	}
+	
 	
 	private boolean transitionState(JobStatus current, JobStatus newState) {
 		return transitionState(current, newState, null);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae57c7c0/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 3c65f2e..fcd21af 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -274,10 +274,12 @@ public class ExecutionVertex {
 			ExecutionEdge[] sources = inputEdges[i];
 			if (sources != null) {
 				for (int k = 0; k < sources.length; k++) {
-					Instance source = sources[k].getSource().getProducer().getCurrentAssignedResource().getInstance();
-					locations.add(source);
-					if (locations.size() > MAX_DISTINCT_LOCATIONS_TO_CONSIDER) {
-						return null;
+					AllocatedSlot sourceSlot = sources[k].getSource().getProducer().getCurrentAssignedResource();
+					if (sourceSlot != null) {
+						locations.add(sourceSlot.getInstance());
+						if (locations.size() > MAX_DISTINCT_LOCATIONS_TO_CONSIDER) {
+							return null;
+						}
 					}
 				}
 			}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae57c7c0/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
index 3526e15..113f8fd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
@@ -272,6 +272,8 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
 	@Override
 	public JobSubmissionResult submitJob(JobGraph job) throws IOException {
 		
+		
+		ExecutionGraph executionGraph = null;
 		boolean success = false;
 		
 		try {
@@ -285,7 +287,7 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
 			}
 			
 			// get the existing execution graph (if we attach), or construct a new empty one to attach
-			ExecutionGraph executionGraph = this.currentJobs.get(job.getJobID());
+			executionGraph = this.currentJobs.get(job.getJobID());
 			if (executionGraph == null) {
 				if (LOG.isInfoEnabled()) {
 					LOG.info("Creating new execution graph for job " + job.getJobID() + " (" + job.getName() + ')');
@@ -331,7 +333,9 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
 				}
 			}
 			catch (FileNotFoundException e) {
-				LOG.error("File-not-Found: " + e.getMessage());
+				String message = "File-not-Found: " + e.getMessage();
+				LOG.error(message);
+				executionGraph.fail(e);
 				return new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR, e.getMessage());
 			}
 			
@@ -373,10 +377,22 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
 		}
 		catch (Throwable t) {
 			LOG.error("Job submission failed.", t);
+			executionGraph.fail(t);
 			return new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR, StringUtils.stringifyException(t));
 		}
 		finally {
 			if (!success) {
+				if (executionGraph != null) {
+					if (executionGraph.getState() != JobStatus.FAILING && executionGraph.getState() != JobStatus.FAILED) {
+						executionGraph.fail(new Exception("Could not set up and start execution graph on JobManager"));
+					}
+					try {
+						executionGraph.waitForJobEnd(10000);
+					} catch (InterruptedException e) {
+						LOG.error("Interrupted while waiting for job to finish canceling.");
+					}
+				}
+				
 				this.currentJobs.remove(job.getJobID());
 				
 				try {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae57c7c0/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
index e8f8b72..1fd5a71 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
@@ -667,9 +667,10 @@ public class TaskManager implements TaskOperationProtocol {
 		// Unregister task from library cache manager
 		try {
 			LibraryCacheManager.unregister(task.getJobID());
-		} catch (IOException e) {
+		}
+		catch (Throwable t) {
 			if (LOG.isDebugEnabled()) {
-				LOG.debug("Unregistering the execution " + executionId + " caused an IOException");
+				LOG.debug("Unregistering the cached libraries caused an exception: ",  t);
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae57c7c0/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ExceptionOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ExceptionOutputFormat.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ExceptionOutputFormat.java
deleted file mode 100644
index 616eaf4..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ExceptionOutputFormat.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.jobmanager;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.io.InitializeOnMaster;
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.StringRecord;
-
-
-public class ExceptionOutputFormat implements OutputFormat<StringRecord>, InitializeOnMaster {
-
-	private static final long serialVersionUID = 1L;
-	
-	/**
-	 * The message which is used for the test runtime exception.
-	 */
-	public static final String RUNTIME_EXCEPTION_MESSAGE = "This is a test runtime exception";
-
-	@Override
-	public void configure(Configuration parameters) {}
-
-	@Override
-	public void open(int taskNumber, int numTasks) {}
-
-	@Override
-	public void writeRecord(StringRecord record) {}
-
-	@Override
-	public void close() {}
-
-	@Override
-	public void initializeGlobal(int parallelism) throws IOException {
-		throw new RuntimeException(RUNTIME_EXCEPTION_MESSAGE);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae57c7c0/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ExceptionTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ExceptionTask.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ExceptionTask.java
deleted file mode 100644
index 7a0f9a5..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ExceptionTask.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.jobmanager;
-
-import org.apache.flink.core.io.StringRecord;
-import org.apache.flink.runtime.io.network.api.RecordReader;
-import org.apache.flink.runtime.io.network.api.RecordWriter;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-
-/**
- * This task is used during the unit tests to generate a custom exception and check the proper response of the execution
- * engine.
- */
-public class ExceptionTask extends AbstractInvokable {
-
-	/**
-	 * The test error message included in the thrown exception
-	 */
-	public static final String ERROR_MESSAGE = "This is an expected test exception";
-
-	/**
-	 * The custom exception thrown by the this task.
-	 * 
-	 */
-	public static class TestException extends Exception {
-
-		/**
-		 * The generated serial version UID.
-		 */
-		private static final long serialVersionUID = -974961143742490972L;
-
-		/**
-		 * Constructs a new test exception.
-		 * 
-		 * @param msg
-		 *        the error message
-		 */
-		public TestException(String msg) {
-			super(msg);
-		}
-	}
-
-	@Override
-	public void registerInputOutput() {
-		new RecordReader<StringRecord>(this, StringRecord.class);
-		new RecordWriter<StringRecord>(this);
-	}
-
-	@Override
-	public void invoke() throws Exception {
-		throw new TestException(ERROR_MESSAGE);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae57c7c0/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
index 8e87e7b..f661ea0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
@@ -32,19 +32,19 @@ import org.apache.flink.runtime.ExecutionMode;
 import org.apache.flink.runtime.client.AbstractJobResult;
 import org.apache.flink.runtime.client.JobSubmissionResult;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
-import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.instance.LocalInstanceManager;
 import org.apache.flink.runtime.io.network.api.RecordReader;
 import org.apache.flink.runtime.io.network.api.RecordWriter;
+import org.apache.flink.runtime.io.network.bufferprovider.GlobalBufferPool;
 import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.tasks.BlockingNoOpInvokable;
 import org.apache.flink.runtime.jobmanager.tasks.NoOpInvokable;
 import org.apache.flink.runtime.types.IntegerRecord;
-import org.apache.flink.util.StringUtils;
-
 import org.junit.Test;
 
 /**
@@ -53,6 +53,77 @@ import org.junit.Test;
 public class JobManagerITCase {
 	
 	@Test
+	public void testScheduleNotEnoughSlots() {
+		
+		try {
+			final AbstractJobVertex vertex = new AbstractJobVertex("Test Vertex");
+			vertex.setParallelism(2);
+			vertex.setInvokableClass(BlockingNoOpInvokable.class);
+			
+			final JobGraph jobGraph = new JobGraph("Test Job", vertex);
+			
+			final JobManager jm = startJobManager(1);
+			
+			final GlobalBufferPool bp = ((LocalInstanceManager) jm.getInstanceManager())
+					.getTaskManagers()[0].getChannelManager().getGlobalBufferPool();
+			
+			try {
+				
+				assertEquals(1, jm.getAvailableSlots());
+				
+				// we need to register the job at the library cache manager (with no libraries)
+				LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
+				
+				JobSubmissionResult result = jm.submitJob(jobGraph);
+				assertEquals(AbstractJobResult.ReturnCode.ERROR, result.getReturnCode());
+				
+				// monitor the execution
+				ExecutionGraph eg = jm.getCurrentJobs().get(jobGraph.getJobID());
+				
+				if (eg != null) {
+					
+					long deadline = System.currentTimeMillis() + 60*1000;
+					boolean success = false;
+					
+					while (System.currentTimeMillis() < deadline) {
+						JobStatus state = eg.getState();
+						if (state == JobStatus.FINISHED) {
+							success = true;
+							break;
+						}
+						else if (state == JobStatus.FAILED || state == JobStatus.CANCELED) {
+							break;
+						}
+						else {
+							Thread.sleep(200);
+						}
+					}
+					
+					assertTrue("The job did not finish successfully.", success);
+					
+					assertEquals(0, eg.getRegisteredExecutions().size());
+				}
+				else {
+					// already done, that was fast;
+				}
+				
+				
+				
+				// make sure that in any case, the network buffers are all returned
+				waitForTaskThreadsToBeTerminated();
+				assertEquals(bp.numBuffers(), bp.numAvailableBuffers());
+			}
+			finally {
+				jm.shutdown();
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
 	public void testSingleVertexJobImmediately() {
 		
 		final int NUM_TASKS = 133;
@@ -64,7 +135,11 @@ public class JobManagerITCase {
 			
 			final JobGraph jobGraph = new JobGraph("Test Job", vertex);
 			
-			JobManager jm = startJobManager(NUM_TASKS);
+			final JobManager jm = startJobManager(NUM_TASKS);
+			
+			final GlobalBufferPool bp = ((LocalInstanceManager) jm.getInstanceManager())
+					.getTaskManagers()[0].getChannelManager().getGlobalBufferPool();
+			
 			try {
 				
 				assertEquals(NUM_TASKS, jm.getAvailableSlots());
@@ -106,6 +181,12 @@ public class JobManagerITCase {
 				else {
 					// already done, that was fast;
 				}
+				
+				assertEquals(0, eg.getRegisteredExecutions().size());
+				
+				// make sure that in any case, the network buffers are all returned
+				waitForTaskThreadsToBeTerminated();
+				assertEquals(bp.numBuffers(), bp.numAvailableBuffers());
 			}
 			finally {
 				jm.shutdown();
@@ -130,7 +211,11 @@ public class JobManagerITCase {
 			final JobGraph jobGraph = new JobGraph("Test Job", vertex);
 			jobGraph.setAllowQueuedScheduling(true);
 			
-			JobManager jm = startJobManager(10);
+			final JobManager jm = startJobManager(10);
+			
+			final GlobalBufferPool bp = ((LocalInstanceManager) jm.getInstanceManager())
+					.getTaskManagers()[0].getChannelManager().getGlobalBufferPool();
+			
 			try {
 				
 				// we need to register the job at the library cache manager (with no libraries)
@@ -150,6 +235,12 @@ public class JobManagerITCase {
 				else {
 					// already done, that was fast;
 				}
+				
+				assertEquals(0, eg.getRegisteredExecutions().size());
+				
+				// make sure that in any case, the network buffers are all returned
+				waitForTaskThreadsToBeTerminated();
+				assertEquals(bp.numBuffers(), bp.numAvailableBuffers());
 			}
 			finally {
 				jm.shutdown();
@@ -180,7 +271,11 @@ public class JobManagerITCase {
 			
 			final JobGraph jobGraph = new JobGraph("Pointwise Job", sender, receiver);
 			
-			JobManager jm = startJobManager(2 * NUM_TASKS);
+			final JobManager jm = startJobManager(2 * NUM_TASKS);
+			
+			final GlobalBufferPool bp = ((LocalInstanceManager) jm.getInstanceManager())
+					.getTaskManagers()[0].getChannelManager().getGlobalBufferPool();
+			
 			try {
 				// we need to register the job at the library cache manager (with no libraries)
 				LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
@@ -199,6 +294,11 @@ public class JobManagerITCase {
 				else {
 					// already done, that was fast;
 				}
+				
+				// make sure that in any case, the network buffers are all returned
+				waitForTaskThreadsToBeTerminated();
+				assertEquals(0, eg.getRegisteredExecutions().size());
+				assertEquals(bp.numBuffers(), bp.numAvailableBuffers());
 			}
 			finally {
 				jm.shutdown();
@@ -229,7 +329,11 @@ public class JobManagerITCase {
 			
 			final JobGraph jobGraph = new JobGraph("Bipartite Job", sender, receiver);
 			
-			JobManager jm = startJobManager(2 * NUM_TASKS);
+			final JobManager jm = startJobManager(2 * NUM_TASKS);
+			
+			final GlobalBufferPool bp = ((LocalInstanceManager) jm.getInstanceManager())
+					.getTaskManagers()[0].getChannelManager().getGlobalBufferPool();
+			
 			try {
 				// we need to register the job at the library cache manager (with no libraries)
 				LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
@@ -248,6 +352,12 @@ public class JobManagerITCase {
 				else {
 					// already done, that was fast;
 				}
+				
+				assertEquals(0, eg.getRegisteredExecutions().size());
+				
+				// make sure that in any case, the network buffers are all returned
+				waitForTaskThreadsToBeTerminated();
+				assertEquals(bp.numBuffers(), bp.numAvailableBuffers());
 			}
 			finally {
 				jm.shutdown();
@@ -260,9 +370,9 @@ public class JobManagerITCase {
 	}
 	
 	@Test
-	public void testTwoInputJob() {
+	public void testTwoInputJobFailingEdgeMismatch() {
 		
-		final int NUM_TASKS = 13;
+		final int NUM_TASKS = 2;
 		
 		try {
 			final AbstractJobVertex sender1 = new AbstractJobVertex("Sender1");
@@ -274,6 +384,68 @@ public class JobManagerITCase {
 			receiver.setInvokableClass(AgnosticReceiver.class);
 			
 			sender1.setParallelism(NUM_TASKS);
+			sender2.setParallelism(NUM_TASKS);
+			receiver.setParallelism(NUM_TASKS);
+			
+			receiver.connectNewDataSetAsInput(sender1, DistributionPattern.POINTWISE);
+			receiver.connectNewDataSetAsInput(sender2, DistributionPattern.BIPARTITE);
+			
+			final JobGraph jobGraph = new JobGraph("Bipartite Job", sender1, receiver, sender2);
+			
+			final JobManager jm = startJobManager(3 * NUM_TASKS);
+			
+			final GlobalBufferPool bp = ((LocalInstanceManager) jm.getInstanceManager())
+					.getTaskManagers()[0].getChannelManager().getGlobalBufferPool();
+			
+			try {
+				// we need to register the job at the library cache manager (with no libraries)
+				LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
+				
+				JobSubmissionResult result = jm.submitJob(jobGraph);
+
+				assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
+				
+				// monitor the execution
+				ExecutionGraph eg = jm.getCurrentJobs().get(jobGraph.getJobID());
+				
+				if (eg != null) {
+					eg.waitForJobEnd();
+					assertEquals(JobStatus.FAILED, eg.getState());
+				}
+				else {
+					// already done, that was fast;
+				}
+				
+				// make sure that in any case, the network buffers are all returned
+				waitForTaskThreadsToBeTerminated();
+				assertEquals(0, eg.getRegisteredExecutions().size());
+				assertEquals(bp.numBuffers(), bp.numAvailableBuffers());
+			}
+			finally {
+				jm.shutdown();
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testTwoInputJob() {
+		
+		final int NUM_TASKS = 11;
+		
+		try {
+			final AbstractJobVertex sender1 = new AbstractJobVertex("Sender1");
+			final AbstractJobVertex sender2 = new AbstractJobVertex("Sender2");
+			final AbstractJobVertex receiver = new AbstractJobVertex("Receiver");
+			
+			sender1.setInvokableClass(Sender.class);
+			sender2.setInvokableClass(Sender.class);
+			receiver.setInvokableClass(AgnosticBinaryReceiver.class);
+			
+			sender1.setParallelism(NUM_TASKS);
 			sender2.setParallelism(2*NUM_TASKS);
 			receiver.setParallelism(3*NUM_TASKS);
 			
@@ -283,6 +455,10 @@ public class JobManagerITCase {
 			final JobGraph jobGraph = new JobGraph("Bipartite Job", sender1, receiver, sender2);
 			
 			JobManager jm = startJobManager(6 * NUM_TASKS);
+			
+			final GlobalBufferPool bp = ((LocalInstanceManager) jm.getInstanceManager())
+								.getTaskManagers()[0].getChannelManager().getGlobalBufferPool();
+			
 			try {
 				// we need to register the job at the library cache manager (with no libraries)
 				LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
@@ -296,11 +472,17 @@ public class JobManagerITCase {
 				
 				if (eg != null) {
 					eg.waitForJobEnd();
-					assertEquals(JobStatus.FAILED, eg.getState());
+					assertEquals(JobStatus.FINISHED, eg.getState());
 				}
 				else {
 					// already done, that was fast;
 				}
+				
+				assertEquals(0, eg.getRegisteredExecutions().size());
+				
+				// make sure that in any case, the network buffers are all returned
+				waitForTaskThreadsToBeTerminated();
+				assertEquals(bp.numBuffers(), bp.numAvailableBuffers());
 			}
 			finally {
 				jm.shutdown();
@@ -331,7 +513,11 @@ public class JobManagerITCase {
 			
 			final JobGraph jobGraph = new JobGraph("Pointwise Job", sender, receiver);
 			
-			JobManager jm = startJobManager(NUM_TASKS);
+			final JobManager jm = startJobManager(NUM_TASKS);
+			
+			final GlobalBufferPool bp = ((LocalInstanceManager) jm.getInstanceManager())
+					.getTaskManagers()[0].getChannelManager().getGlobalBufferPool();
+			
 			try {
 				assertEquals(NUM_TASKS, jm.getAvailableSlots());
 				
@@ -354,6 +540,10 @@ public class JobManagerITCase {
 				}
 				
 				assertEquals(0, eg.getRegisteredExecutions().size());
+				
+				// make sure that in any case, the network buffers are all returned
+				waitForTaskThreadsToBeTerminated();
+				assertEquals(bp.numBuffers(), bp.numAvailableBuffers());
 			}
 			finally {
 				jm.shutdown();
@@ -384,7 +574,11 @@ public class JobManagerITCase {
 			
 			final JobGraph jobGraph = new JobGraph("Pointwise Job", sender, receiver);
 			
-			JobManager jm = startJobManager(NUM_TASKS);
+			final JobManager jm = startJobManager(NUM_TASKS);
+			
+			final GlobalBufferPool bp = ((LocalInstanceManager) jm.getInstanceManager())
+					.getTaskManagers()[0].getChannelManager().getGlobalBufferPool();
+			
 			try {
 				assertEquals(NUM_TASKS, jm.getAvailableSlots());
 				
@@ -407,6 +601,10 @@ public class JobManagerITCase {
 				}
 				
 				assertEquals(0, eg.getRegisteredExecutions().size());
+				
+				// make sure that in any case, the network buffers are all returned
+				waitForTaskThreadsToBeTerminated();
+				assertEquals(bp.numBuffers(), bp.numAvailableBuffers());
 			}
 			finally {
 				jm.shutdown();
@@ -437,7 +635,11 @@ public class JobManagerITCase {
 			
 			final JobGraph jobGraph = new JobGraph("Pointwise Job", sender, receiver);
 			
-			JobManager jm = startJobManager(2 * NUM_TASKS);
+			final JobManager jm = startJobManager(2 * NUM_TASKS);
+			
+			final GlobalBufferPool bp = ((LocalInstanceManager) jm.getInstanceManager())
+					.getTaskManagers()[0].getChannelManager().getGlobalBufferPool();
+			
 			try {
 				
 				// we need to register the job at the library cache manager (with no libraries)
@@ -459,6 +661,10 @@ public class JobManagerITCase {
 				}
 				
 				assertEquals(0, eg.getRegisteredExecutions().size());
+				
+				// make sure that in any case, the network buffers are all returned
+				waitForTaskThreadsToBeTerminated();
+				assertEquals(bp.numBuffers(), bp.numAvailableBuffers());
 			}
 			finally {
 				jm.shutdown();
@@ -492,7 +698,11 @@ public class JobManagerITCase {
 			
 			final JobGraph jobGraph = new JobGraph("Pointwise Job", sender, receiver);
 			
-			JobManager jm = startJobManager(NUM_TASKS);
+			final JobManager jm = startJobManager(NUM_TASKS);
+			
+			final GlobalBufferPool bp = ((LocalInstanceManager) jm.getInstanceManager())
+					.getTaskManagers()[0].getChannelManager().getGlobalBufferPool();
+			
 			try {
 				assertEquals(NUM_TASKS, jm.getAvailableSlots());
 				
@@ -515,6 +725,10 @@ public class JobManagerITCase {
 				}
 				
 				assertEquals(0, eg.getRegisteredExecutions().size());
+				
+				// make sure that in any case, the network buffers are all returned
+				waitForTaskThreadsToBeTerminated();
+				assertEquals(bp.numBuffers(), bp.numAvailableBuffers());
 			}
 			finally {
 				jm.shutdown();
@@ -548,7 +762,11 @@ public class JobManagerITCase {
 			
 			final JobGraph jobGraph = new JobGraph("Pointwise Job", sender, receiver);
 			
-			JobManager jm = startJobManager(NUM_TASKS);
+			final JobManager jm = startJobManager(NUM_TASKS);
+			
+			final GlobalBufferPool bp = ((LocalInstanceManager) jm.getInstanceManager())
+					.getTaskManagers()[0].getChannelManager().getGlobalBufferPool();
+			
 			try {
 				assertEquals(NUM_TASKS, jm.getAvailableSlots());
 				
@@ -570,11 +788,11 @@ public class JobManagerITCase {
 					// already done, that was fast;
 				}
 				
-				for (Execution e : eg.getRegisteredExecutions().values()) {
-					System.out.println(e + StringUtils.arrayAwareToString(e.getStateTimestamps()));
-				}
-				
 				assertEquals(0, eg.getRegisteredExecutions().size());
+				
+				// make sure that in any case, the network buffers are all returned
+				waitForTaskThreadsToBeTerminated();
+				assertEquals(bp.numBuffers(), bp.numAvailableBuffers());
 			}
 			finally {
 				jm.shutdown();
@@ -629,6 +847,21 @@ public class JobManagerITCase {
 		throw new IOException("could not find free port");
 	}
 	
+	private static void waitForTaskThreadsToBeTerminated() throws InterruptedException {
+		Thread[] threads = new Thread[Thread.activeCount()];
+		Thread.enumerate(threads);
+		
+		for (Thread t : threads) {
+			if (t == null) {
+				continue;
+			}
+			ThreadGroup tg = t.getThreadGroup();
+			if (tg != null && tg.getName() != null && tg.getName().equals("Task Threads")) {
+				t.join();
+			}
+		}
+	}
+	
 	// --------------------------------------------------------------------------------------------
 	//  Simple test tasks
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae57c7c0/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/BlockingNoOpInvokable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/BlockingNoOpInvokable.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/BlockingNoOpInvokable.java
new file mode 100644
index 0000000..c8d1c98
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/BlockingNoOpInvokable.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.tasks;
+
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+
+/**
+ * An invokable that does nothing.
+ */
+public class BlockingNoOpInvokable extends AbstractInvokable {
+
+	@Override
+	public void registerInputOutput() {}
+
+	@Override
+	public void invoke() throws Exception {
+		Object o = new Object();
+		synchronized (o) {
+			o.wait();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae57c7c0/flink-runtime/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/resources/logback-test.xml b/flink-runtime/src/test/resources/logback-test.xml
index 7fb3387..f817d4d 100644
--- a/flink-runtime/src/test/resources/logback-test.xml
+++ b/flink-runtime/src/test/resources/logback-test.xml
@@ -37,4 +37,5 @@
     <logger name="org.apache.flink.runtime.jobmanager.JobManager" level="OFF"/>
     <logger name="org.apache.flink.runtime.taskmanager.TaskManager" level="OFF"/>
     <logger name="org.apache.flink.runtime.executiongraph.ExecutionGraph" level="OFF"/>
+    <logger name="org.apache.flink.runtime.jobmanager.EventCollector" level="OFF"/>
 </configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae57c7c0/flink-runtime/src/test/resources/topology.txt
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/resources/topology.txt b/flink-runtime/src/test/resources/topology.txt
deleted file mode 100644
index b199929..0000000
--- a/flink-runtime/src/test/resources/topology.txt
+++ /dev/null
@@ -1,16 +0,0 @@
-/mainswitch1/rackswitch1/node01
-/mainswitch1/rackswitch1/node02
-/mainswitch1/rackswitch1/node03
-/mainswitch1/rackswitch1/node04
-/mainswitch1/rackswitch2/node05
-/mainswitch1/rackswitch2/node06
-/mainswitch1/rackswitch2/node07
-/mainswitch1/rackswitch2/node08
-/mainswitch2/rackswitch3/node09
-/mainswitch2/rackswitch3/node10
-/mainswitch2/rackswitch3/node11
-/mainswitch2/rackswitch3/node12
-/mainswitch2/rackswitch4/node13
-/mainswitch2/rackswitch4/node14
-/mainswitch2/rackswitch4/node15
-/mainswitch2/rackswitch4/node16