You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/23 23:49:43 UTC

[6/6] git commit: Tasks are marked correctly as failed (not canceled), when the taskmanager kills then during shutdown or reset.

Tasks are marked correctly as failed (not canceled), when the taskmanager kills then during shutdown or reset.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/4a91be2e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/4a91be2e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/4a91be2e

Branch: refs/heads/master
Commit: 4a91be2e431a029601a45d4d049e47418c4ab5f7
Parents: ab0b3a3
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Sep 23 21:45:50 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Sep 23 22:26:25 2014 +0200

----------------------------------------------------------------------
 .../runtime/execution/RuntimeEnvironment.java   |  17 +--
 .../bufferprovider/GlobalBufferPool.java        |   4 +
 .../apache/flink/runtime/taskmanager/Task.java  |  56 ++++++++-
 .../flink/runtime/taskmanager/TaskManager.java  |  17 +--
 .../runtime/jobgraph/JobManagerTestUtils.java   |  13 +-
 .../jobmanager/TaskManagerFailsITCase.java      | 117 ++++++++++++++++++
 .../TaskManagerFailsWithSlotSharingITCase.java  | 122 +++++++++++++++++++
 .../jobmanager/tasks/BlockingReceiver.java      |  39 ++++++
 8 files changed, 365 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4a91be2e/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
index 6b60174..e052ccf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
@@ -225,7 +225,7 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 	@Override
 	public void run() {
 		// quick fail in case the task was cancelled while the tread was started
-		if (owner.isCanceled()) {
+		if (owner.isCanceledOrFailed()) {
 			owner.cancelingDone();
 			return;
 		}
@@ -235,7 +235,7 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 			this.invokable.invoke();
 
 			// Make sure, we enter the catch block when the task has been canceled
-			if (this.owner.isCanceled()) {
+			if (this.owner.isCanceledOrFailed()) {
 				throw new CancelTaskException();
 			}
 			
@@ -251,7 +251,7 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 			// Now we wait until all output channels have written out their data and are closed
 			waitForOutputChannelsToBeClosed();
 			
-			if (this.owner.isCanceled()) {
+			if (this.owner.isCanceledOrFailed()) {
 				throw new CancelTaskException();
 			}
 			
@@ -262,7 +262,7 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 		}
 		catch (Throwable t) {
 			
-			if (!this.owner.isCanceled()) {
+			if (!this.owner.isCanceledOrFailed()) {
 
 				// Perform clean up when the task failed and has been not canceled by the user
 				try {
@@ -275,10 +275,13 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 			// Release all resources that may currently be allocated by the individual channels
 			releaseAllChannelResources();
 
-			if (this.owner.isCanceled() || t instanceof CancelTaskException) {
+			// if we are already set as cancelled or failed (when failure is triggered externally),
+			// mark that the thread is done.
+			if (this.owner.isCanceledOrFailed() || t instanceof CancelTaskException) {
 				this.owner.cancelingDone();
 			}
 			else {
+				// failure from inside the task thread. notify the task of teh failure
 				this.owner.markFailed(t);
 			}
 		}
@@ -429,7 +432,7 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 	 */
 	private void waitForOutputChannelsToBeClosed() throws InterruptedException {
 		// Make sure, we leave this method with an InterruptedException when the task has been canceled
-		if (this.owner.isCanceled()) {
+		if (this.owner.isCanceledOrFailed()) {
 			return;
 		}
 
@@ -449,7 +452,7 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 		while (!canceled.get()) {
 
 			// Make sure, we leave this method with an InterruptedException when the task has been canceled
-			if (this.owner.isCanceled()) {
+			if (this.owner.isCanceledOrFailed()) {
 				throw new InterruptedException();
 			}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4a91be2e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/bufferprovider/GlobalBufferPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/bufferprovider/GlobalBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/bufferprovider/GlobalBufferPool.java
index c3a36bb..b1d7adf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/bufferprovider/GlobalBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/bufferprovider/GlobalBufferPool.java
@@ -138,4 +138,8 @@ public final class GlobalBufferPool {
 			this.buffers.clear();
 		}
 	}
+	
+	public boolean isDestroyed() {
+		return isDestroyed;
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4a91be2e/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index f106614..d393e2e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -147,9 +147,10 @@ public final class Task {
 		return environment;
 	}
 	
-	public boolean isCanceled() {
+	public boolean isCanceledOrFailed() {
 		return executionState == ExecutionState.CANCELING ||
-				executionState == ExecutionState.CANCELED;
+				executionState == ExecutionState.CANCELED ||
+				executionState == ExecutionState.FAILED;
 	}
 	
 	public String getTaskName() {
@@ -242,11 +243,60 @@ public final class Task {
 		}
 	}
 	
+	/**
+	 * Sets the tasks to be cancelled and reports a failure back to the master.
+	 * This method is important if a failure needs to be reported to the master, because
+	 * a simple canceled m
+	 * 
+	 * @param cause The exception to report in the error message
+	 */
+	public void failExternally(Throwable cause) {
+		while (true) {
+			ExecutionState current = this.executionState;
+			
+			// if the task is already canceled (or canceling) or finished or failed,
+			// then we need not do anything
+			if (current == ExecutionState.FINISHED || current == ExecutionState.CANCELED ||
+					current == ExecutionState.CANCELING || current == ExecutionState.FAILED)
+			{
+				return;
+			}
+			
+			if (current == ExecutionState.DEPLOYING) {
+				// directly set to canceled
+				if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.FAILED)) {
+					
+					notifyObservers(ExecutionState.FAILED, null);
+					taskManager.notifyExecutionStateChange(jobId, executionId, ExecutionState.FAILED, cause);
+					return;
+				}
+			}
+			else if (current == ExecutionState.RUNNING) {
+				// go to canceling and perform the actual task canceling
+				if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.FAILED)) {
+					try {
+						this.environment.cancelExecution();
+					} catch (Throwable e) {
+						LOG.error("Error while cancelling the task.", e);
+					}
+					
+					notifyObservers(ExecutionState.FAILED, null);
+					taskManager.notifyExecutionStateChange(jobId, executionId, ExecutionState.FAILED, cause);
+					
+					return;
+				}
+			}
+			else {
+				throw new RuntimeException("unexpected state for cancelling: " + current);
+			}
+		}
+	}
+	
 	public void cancelingDone() {
 		while (true) {
 			ExecutionState current = this.executionState;
 			
-			if (current == ExecutionState.CANCELED) {
+			if (current == ExecutionState.CANCELED || current == ExecutionState.FAILED) {
 				return;
 			}
 			if (!(current == ExecutionState.RUNNING || current == ExecutionState.CANCELING)) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4a91be2e/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
index 1819b79..305d39f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
@@ -398,7 +398,7 @@ public class TaskManager implements TaskOperationProtocol {
 
 		LOG.info("Shutting down TaskManager");
 		
-		cancelAndClearEverything();
+		cancelAndClearEverything(new Exception("Task Manager is shutting down"));
 		
 		// first, stop the heartbeat thread and wait for it to terminate
 		this.heartbeatThread.interrupt();
@@ -699,11 +699,14 @@ public class TaskManager implements TaskOperationProtocol {
 	/**
 	 * Removes all tasks from this TaskManager.
 	 */
-	public void cancelAndClearEverything() {
-		LOG.info("Cancelling all computations and discarding all cached data.");
-		for (Task t : runningTasks.values()) {
-			t.cancelExecution();
-			runningTasks.remove(t.getExecutionId());
+	public void cancelAndClearEverything(Throwable cause) {
+		if (runningTasks.size() > 0) {
+			LOG.info("Cancelling all computations and discarding all cached data.");
+			
+			for (Task t : runningTasks.values()) {
+				t.failExternally(cause);
+				runningTasks.remove(t.getExecutionId());
+			}
 		}
 	}
 	
@@ -841,7 +844,7 @@ public class TaskManager implements TaskOperationProtocol {
 					
 					// mark us as disconnected and abort all computation
 					this.registeredId = null;
-					cancelAndClearEverything();
+					cancelAndClearEverything(new Exception("TaskManager lost heartbeat connection to JobManager"));
 					
 					// wait for a while, then attempt to register again
 					try {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4a91be2e/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java
index eebe9b0..8ed7a6d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java
@@ -32,11 +32,16 @@ import org.apache.flink.runtime.jobmanager.JobManager;
 public class JobManagerTestUtils {
 
 	public static final JobManager startJobManager(int numSlots) throws Exception {
+		return startJobManager(1, numSlots);
+	}
+	
+	public static final JobManager startJobManager(int numTaskManagers, int numSlotsPerTaskManager) throws Exception {
 		Configuration cfg = new Configuration();
 		cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
 		cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, getAvailablePort());
 		cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 10);
-		cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots);
+		cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager);
+		cfg.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskManagers);
 		
 		GlobalConfiguration.includeConfiguration(cfg);
 		
@@ -46,11 +51,13 @@ public class JobManagerTestUtils {
 		// max time is 5 seconds
 		long deadline = System.currentTimeMillis() + 5000;
 		
-		while (jm.getNumberOfSlotsAvailableToScheduler() < numSlots && System.currentTimeMillis() < deadline) {
+		while (jm.getNumberOfSlotsAvailableToScheduler() < numTaskManagers * numSlotsPerTaskManager &&
+				System.currentTimeMillis() < deadline)
+		{
 			Thread.sleep(10);
 		}
 		
-		assertEquals(numSlots, jm.getNumberOfSlotsAvailableToScheduler());
+		assertEquals(numTaskManagers * numSlotsPerTaskManager, jm.getNumberOfSlotsAvailableToScheduler());
 		
 		return jm;
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4a91be2e/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.java
new file mode 100644
index 0000000..def20a0
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import static org.apache.flink.runtime.jobgraph.JobManagerTestUtils.startJobManager;
+import static org.apache.flink.runtime.jobgraph.JobManagerTestUtils.waitForTaskThreadsToBeTerminated;
+import static org.junit.Assert.*;
+
+import org.apache.flink.runtime.client.AbstractJobResult;
+import org.apache.flink.runtime.client.JobSubmissionResult;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.instance.LocalInstanceManager;
+import org.apache.flink.runtime.io.network.bufferprovider.GlobalBufferPool;
+import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.tasks.BlockingReceiver;
+import org.apache.flink.runtime.jobmanager.tasks.Sender;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.junit.Test;
+
+public class TaskManagerFailsITCase {
+
+	@Test
+	public void testExecutionWithFailingTaskManager() {
+		final int NUM_TASKS = 31;
+		
+		try {
+			final AbstractJobVertex sender = new AbstractJobVertex("Sender");
+			final AbstractJobVertex receiver = new AbstractJobVertex("Receiver");
+			sender.setInvokableClass(Sender.class);
+			receiver.setInvokableClass(BlockingReceiver.class);
+			sender.setParallelism(NUM_TASKS);
+			receiver.setParallelism(NUM_TASKS);
+			receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE);
+			
+			final JobGraph jobGraph = new JobGraph("Pointwise Job", sender, receiver);
+			
+			final JobManager jm = startJobManager(2, NUM_TASKS);
+			
+			final TaskManager tm1 = ((LocalInstanceManager) jm.getInstanceManager()).getTaskManagers()[0];
+			final TaskManager tm2 = ((LocalInstanceManager) jm.getInstanceManager()).getTaskManagers()[1];
+			
+			final GlobalBufferPool bp1 = tm1.getChannelManager().getGlobalBufferPool();
+			final GlobalBufferPool bp2 = tm2.getChannelManager().getGlobalBufferPool();
+			
+			try {
+				// we need to register the job at the library cache manager (with no libraries)
+				LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
+				
+				JobSubmissionResult result = jm.submitJob(jobGraph);
+
+				if (result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) {
+					System.out.println(result.getDescription());
+				}
+				assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
+				
+				ExecutionGraph eg = jm.getCurrentJobs().get(jobGraph.getJobID());
+				
+				// wait until everyone has settled in
+				long deadline = System.currentTimeMillis() + 2000;
+				while (System.currentTimeMillis() < deadline) {
+					
+					boolean allrunning = true;
+					for (ExecutionVertex v : eg.getJobVertex(receiver.getID()).getTaskVertices()) {
+						if (v.getCurrentExecutionAttempt().getState() != ExecutionState.RUNNING) {
+							allrunning = false;
+							break;
+						}
+					}
+					
+					if (allrunning) {
+						break;
+					}
+					Thread.sleep(200);
+				}
+				
+				// kill one task manager
+				TaskManager tm = ((LocalInstanceManager) jm.getInstanceManager()).getTaskManagers()[0];
+				tm.shutdown();
+				
+				eg.waitForJobEnd();
+				
+				// make sure that in any case, the network buffers are all returned
+				waitForTaskThreadsToBeTerminated();
+				assertTrue(bp1.isDestroyed() || bp1.numBuffers() == bp1.numAvailableBuffers());
+				assertTrue(bp2.isDestroyed() || bp2.numBuffers() == bp2.numAvailableBuffers());
+			}
+			finally {
+				jm.shutdown();
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4a91be2e/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.java
new file mode 100644
index 0000000..d6fec59
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import static org.apache.flink.runtime.jobgraph.JobManagerTestUtils.startJobManager;
+import static org.apache.flink.runtime.jobgraph.JobManagerTestUtils.waitForTaskThreadsToBeTerminated;
+import static org.junit.Assert.*;
+
+import org.apache.flink.runtime.client.AbstractJobResult;
+import org.apache.flink.runtime.client.JobSubmissionResult;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.instance.LocalInstanceManager;
+import org.apache.flink.runtime.io.network.bufferprovider.GlobalBufferPool;
+import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.jobmanager.tasks.BlockingReceiver;
+import org.apache.flink.runtime.jobmanager.tasks.Sender;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.junit.Test;
+
+public class TaskManagerFailsWithSlotSharingITCase {
+
+	@Test
+	public void testExecutionWithFailingTaskManager() {
+		final int NUM_TASKS = 20;
+		
+		try {
+			final AbstractJobVertex sender = new AbstractJobVertex("Sender");
+			final AbstractJobVertex receiver = new AbstractJobVertex("Receiver");
+			sender.setInvokableClass(Sender.class);
+			receiver.setInvokableClass(BlockingReceiver.class);
+			sender.setParallelism(NUM_TASKS);
+			receiver.setParallelism(NUM_TASKS);
+			receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE);
+			
+			SlotSharingGroup sharingGroup = new SlotSharingGroup();
+			sender.setSlotSharingGroup(sharingGroup);
+			receiver.setSlotSharingGroup(sharingGroup);
+			
+			final JobGraph jobGraph = new JobGraph("Pointwise Job", sender, receiver);
+			
+			final JobManager jm = startJobManager(2, NUM_TASKS / 2);
+			
+			final TaskManager tm1 = ((LocalInstanceManager) jm.getInstanceManager()).getTaskManagers()[0];
+			final TaskManager tm2 = ((LocalInstanceManager) jm.getInstanceManager()).getTaskManagers()[1];
+			
+			final GlobalBufferPool bp1 = tm1.getChannelManager().getGlobalBufferPool();
+			final GlobalBufferPool bp2 = tm2.getChannelManager().getGlobalBufferPool();
+			
+			try {
+				// we need to register the job at the library cache manager (with no libraries)
+				LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
+				
+				JobSubmissionResult result = jm.submitJob(jobGraph);
+
+				if (result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) {
+					System.out.println(result.getDescription());
+				}
+				assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
+				
+				ExecutionGraph eg = jm.getCurrentJobs().get(jobGraph.getJobID());
+				
+				// wait until everyone has settled in
+				long deadline = System.currentTimeMillis() + 2000;
+				while (System.currentTimeMillis() < deadline) {
+					
+					boolean allrunning = true;
+					for (ExecutionVertex v : eg.getJobVertex(receiver.getID()).getTaskVertices()) {
+						if (v.getCurrentExecutionAttempt().getState() != ExecutionState.RUNNING) {
+							allrunning = false;
+							break;
+						}
+					}
+					
+					if (allrunning) {
+						break;
+					}
+					Thread.sleep(200);
+				}
+				
+				// kill one task manager
+				TaskManager tm = ((LocalInstanceManager) jm.getInstanceManager()).getTaskManagers()[0];
+				tm.shutdown();
+				
+				eg.waitForJobEnd();
+				
+				// make sure that in any case, the network buffers are all returned
+				waitForTaskThreadsToBeTerminated();
+				assertTrue(bp1.isDestroyed() || bp1.numBuffers() == bp1.numAvailableBuffers());
+				assertTrue(bp2.isDestroyed() || bp2.numBuffers() == bp2.numAvailableBuffers());
+			}
+			finally {
+				jm.shutdown();
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4a91be2e/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/BlockingReceiver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/BlockingReceiver.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/BlockingReceiver.java
new file mode 100644
index 0000000..f7d4ee5
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/BlockingReceiver.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.tasks;
+
+import org.apache.flink.runtime.io.network.api.RecordReader;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.types.IntegerRecord;
+
+public final class BlockingReceiver extends AbstractInvokable {
+	
+	@Override
+	public void registerInputOutput() {
+		new RecordReader<IntegerRecord>(this, IntegerRecord.class);
+	}
+
+	@Override
+	public void invoke() throws Exception {
+		Object o = new Object();
+		synchronized (o) {
+			o.wait();
+		}
+	}
+}