You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/08/18 07:36:46 UTC

[1/5] flink git commit: [FLINK-7056][tests][hotfix] make sure the client and a created InputStream are closed

Repository: flink
Updated Branches:
  refs/heads/master 6c6d90084 -> 7b2362406


[FLINK-7056][tests][hotfix] make sure the client and a created InputStream are closed

If not and the server has not yet sent all data packets, it may still occupy the
read lock and block any writing operations (also see FLINK-7467).

This closes #4558.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d0a15060
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d0a15060
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d0a15060

Branch: refs/heads/master
Commit: d0a150609b46cabfe7f5f0d760c465dcee5588fb
Parents: 6c6d900
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Thu Aug 17 12:04:09 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Aug 18 09:29:18 2017 +0200

----------------------------------------------------------------------
 .../flink/runtime/blob/BlobClientTest.java      | 59 ++++++++++----------
 .../runtime/blob/BlobServerDeleteTest.java      | 11 ++--
 .../flink/runtime/blob/BlobServerGetTest.java   |  7 ++-
 .../flink/runtime/blob/BlobServerPutTest.java   | 25 +++------
 4 files changed, 47 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d0a15060/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
index cfec4c5..d511e86 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
@@ -139,30 +139,35 @@ public class BlobClientTest {
 	 * the specified buffer.
 	 * 
 	 * @param inputStream
-	 *        the input stream returned from the GET operation
+	 *        the input stream returned from the GET operation (will be closed by this method)
 	 * @param buf
 	 *        the buffer to compare the input stream's data to
 	 * @throws IOException
 	 *         thrown if an I/O error occurs while reading the input stream
 	 */
-	static void validateGet(final InputStream inputStream, final byte[] buf) throws IOException {
-		byte[] receivedBuffer = new byte[buf.length];
+	static void validateGetAndClose(final InputStream inputStream, final byte[] buf) throws IOException {
+		try {
+			byte[] receivedBuffer = new byte[buf.length];
 
-		int bytesReceived = 0;
+			int bytesReceived = 0;
 
-		while (true) {
+			while (true) {
 
-			final int read = inputStream.read(receivedBuffer, bytesReceived, receivedBuffer.length - bytesReceived);
-			if (read < 0) {
-				throw new EOFException();
-			}
-			bytesReceived += read;
+				final int read = inputStream
+					.read(receivedBuffer, bytesReceived, receivedBuffer.length - bytesReceived);
+				if (read < 0) {
+					throw new EOFException();
+				}
+				bytesReceived += read;
 
-			if (bytesReceived == receivedBuffer.length) {
-				assertEquals(-1, inputStream.read());
-				assertArrayEquals(buf, receivedBuffer);
-				return;
+				if (bytesReceived == receivedBuffer.length) {
+					assertEquals(-1, inputStream.read());
+					assertArrayEquals(buf, receivedBuffer);
+					return;
+				}
 			}
+		} finally {
+			inputStream.close();
 		}
 	}
 
@@ -171,13 +176,13 @@ public class BlobClientTest {
 	 * the specified file.
 	 * 
 	 * @param inputStream
-	 *        the input stream returned from the GET operation
+	 *        the input stream returned from the GET operation (will be closed by this method)
 	 * @param file
 	 *        the file to compare the input stream's data to
 	 * @throws IOException
 	 *         thrown if an I/O error occurs while reading the input stream or the file
 	 */
-	private static void validateGet(final InputStream inputStream, final File file) throws IOException {
+	private static void validateGetAndClose(final InputStream inputStream, final File file) throws IOException {
 
 		InputStream inputStream2 = null;
 		try {
@@ -200,6 +205,7 @@ public class BlobClientTest {
 			if (inputStream2 != null) {
 				inputStream2.close();
 			}
+			inputStream.close();
 		}
 
 	}
@@ -231,14 +237,11 @@ public class BlobClientTest {
 			assertEquals(origKey, receivedKey);
 
 			// Retrieve the data
-			InputStream is = client.get(receivedKey);
-			validateGet(is, testBuffer);
-			is = client.get(jobId, receivedKey);
-			validateGet(is, testBuffer);
+			validateGetAndClose(client.get(receivedKey), testBuffer);
+			validateGetAndClose(client.get(jobId, receivedKey), testBuffer);
 
 			// Check reaction to invalid keys
-			try {
-				client.get(new BlobKey());
+			try (InputStream ignored = client.get(new BlobKey())) {
 				fail("Expected IOException did not occur");
 			}
 			catch (IOException fnfe) {
@@ -246,8 +249,7 @@ public class BlobClientTest {
 			}
 			// new client needed (closed from failure above)
 			client = new BlobClient(serverAddress, getBlobClientConfig());
-			try {
-				client.get(jobId, new BlobKey());
+			try (InputStream ignored = client.get(jobId, new BlobKey())) {
 				fail("Expected IOException did not occur");
 			}
 			catch (IOException fnfe) {
@@ -308,10 +310,8 @@ public class BlobClientTest {
 			is = null;
 
 			// Retrieve the data
-			is = client.get(receivedKey);
-			validateGet(is, testFile);
-			is = client.get(jobId, receivedKey);
-			validateGet(is, testFile);
+			validateGetAndClose(client.get(receivedKey), testFile);
+			validateGetAndClose(client.get(jobId, receivedKey), testFile);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -362,8 +362,7 @@ public class BlobClientTest {
 		assertEquals(1, blobKeys.size());
 
 		try (BlobClient blobClient = new BlobClient(serverAddress, blobClientConfig)) {
-			InputStream is = blobClient.get(blobKeys.get(0));
-			validateGet(is, testFile);
+			validateGetAndClose(blobClient.get(blobKeys.get(0)), testFile);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d0a15060/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
index d91aae42..413e2e9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
@@ -31,6 +31,7 @@ import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
 import java.io.IOException;
+import java.io.InputStream;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.List;
@@ -40,6 +41,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
+import static org.apache.flink.runtime.blob.BlobClientTest.validateGetAndClose;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
@@ -99,8 +101,7 @@ public class BlobServerDeleteTest extends TestLogger {
 			client.close();
 
 			client = new BlobClient(serverAddress, config);
-			try {
-				client.get(key1);
+			try (InputStream ignored = client.get(key1)) {
 				fail("BLOB should have been deleted");
 			}
 			catch (IOException e) {
@@ -111,12 +112,14 @@ public class BlobServerDeleteTest extends TestLogger {
 
 			client = new BlobClient(serverAddress, config);
 			try {
-				client.get(jobId, key1);
+				// NOTE: the server will stall in its send operation until either the data is fully
+				//       read or the socket is closed, e.g. via a client.close() call
+				validateGetAndClose(client.get(jobId, key1), data);
 			}
 			catch (IOException e) {
-				// expected
 				fail("Deleting a job-unrelated BLOB should not affect a job-related BLOB with the same key");
 			}
+			client.close();
 
 			// delete a file directly on the server
 			server.delete(key2);

http://git-wip-us.apache.org/repos/asf/flink/blob/d0a15060/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
index 5ad8d95..7ccf075 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
@@ -49,6 +49,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
+import static org.apache.flink.runtime.blob.BlobClientTest.validateGetAndClose;
 import static org.junit.Assert.*;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
@@ -122,7 +123,7 @@ public class BlobServerGetTest extends TestLogger {
 			assertNotNull(key);
 			assertEquals(key, key2);
 			// request for jobId2 should succeed
-			getFileHelper(client, jobId2, key);
+			validateGetAndClose(getFileHelper(client, jobId2, key), data);
 			// request for jobId1 should still fail
 			client = verifyDeleted(client, jobId1, key, serverAddress, config);
 
@@ -160,8 +161,7 @@ public class BlobServerGetTest extends TestLogger {
 	private static BlobClient verifyDeleted(
 			BlobClient client, JobID jobId, BlobKey key,
 			InetSocketAddress serverAddress, Configuration config) throws IOException {
-		try {
-			getFileHelper(client, jobId, key);
+		try (InputStream ignored = getFileHelper(client, jobId, key)) {
 			fail("This should not succeed.");
 		} catch (IOException e) {
 			// expected
@@ -227,6 +227,7 @@ public class BlobServerGetTest extends TestLogger {
 			catch (IOException e) {
 				// expected
 			}
+			is.close();
 		} finally {
 			if (client != null) {
 				client.close();

http://git-wip-us.apache.org/repos/asf/flink/blob/d0a15060/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
index f55adb7..2b8e2d2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
@@ -46,8 +46,8 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
+import static org.apache.flink.runtime.blob.BlobClientTest.validateGetAndClose;
 import static org.apache.flink.runtime.blob.BlobServerGetTest.getFileHelper;
-import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -226,9 +226,9 @@ public class BlobServerPutTest extends TestLogger {
 	 * @param jobId
 	 * 		job ID or <tt>null</tt> if job-unrelated
 	 * @param key1
-	 * 		first key
+	 * 		first key for 44 bytes starting at byte 10 of data in the BLOB
 	 * @param key2
-	 * 		second key
+	 * 		second key for the complete data in the BLOB
 	 * @param data
 	 * 		expected data
 	 * @param serverAddress
@@ -241,12 +241,9 @@ public class BlobServerPutTest extends TestLogger {
 			InetSocketAddress serverAddress, Configuration config) throws IOException {
 
 		BlobClient client = new BlobClient(serverAddress, config);
-		InputStream is1 = null;
-		InputStream is2 = null;
 
-		try {
-			// one get request on the same client
-			is1 = getFileHelper(client, jobId, key2);
+		// one get request on the same client
+		try (InputStream is1 = getFileHelper(client, jobId, key2)) {
 			byte[] result1 = new byte[44];
 			BlobUtils.readFully(is1, result1, 0, result1.length, null);
 			is1.close();
@@ -255,20 +252,12 @@ public class BlobServerPutTest extends TestLogger {
 				assertEquals(data[j], result1[i]);
 			}
 
-			// close the client and create a new one for the remaining requests
+			// close the client and create a new one for the remaining request
 			client.close();
 			client = new BlobClient(serverAddress, config);
 
-			is2 = getFileHelper(client, jobId, key1);
-			BlobClientTest.validateGet(is2, data);
-			is2.close();
+			validateGetAndClose(getFileHelper(client, jobId, key1), data);
 		} finally {
-			if (is1 != null) {
-				is1.close();
-			}
-			if (is2 != null) {
-				is1.close();
-			}
 			client.close();
 		}
 	}


[3/5] flink git commit: [FLINK-7057][blob] move ref-counting from the LibraryCacheManager to the BlobCache

Posted by tr...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index 23f0a38..933c7a0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.jobmanager;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
@@ -27,7 +28,6 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.blob.BlobServer;
-import org.apache.flink.runtime.blob.BlobService;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
@@ -90,6 +90,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
+
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -158,6 +159,7 @@ public class JobManagerHARecoveryTest extends TestLogger {
 		flinkConfiguration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
 		flinkConfiguration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().toString());
 		flinkConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, slots);
+		flinkConfiguration.setLong(BlobServerOptions.CLEANUP_INTERVAL, 3_600L);
 
 		try {
 			Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
@@ -179,6 +181,9 @@ public class JobManagerHARecoveryTest extends TestLogger {
 
 			archive = system.actorOf(JobManager.getArchiveProps(MemoryArchivist.class, 10, Option.<Path>empty()));
 
+			BlobServer blobServer = new BlobServer(
+				flinkConfiguration,
+				testingHighAvailabilityServices.createBlobStore());
 			Props jobManagerProps = Props.create(
 				TestingJobManager.class,
 				flinkConfiguration,
@@ -186,11 +191,8 @@ public class JobManagerHARecoveryTest extends TestLogger {
 				TestingUtils.defaultExecutor(),
 				instanceManager,
 				scheduler,
-				new BlobLibraryCacheManager(
-					new BlobServer(
-						flinkConfiguration,
-						testingHighAvailabilityServices.createBlobStore()),
-					3600000L),
+				blobServer,
+				new BlobLibraryCacheManager(blobServer),
 				archive,
 				new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(Int.MaxValue(), 100),
 				timeout,
@@ -353,6 +355,7 @@ public class JobManagerHARecoveryTest extends TestLogger {
 
 			final Collection<JobID> recoveredJobs = new ArrayList<>(2);
 
+			BlobServer blobServer = mock(BlobServer.class);
 			Props jobManagerProps = Props.create(
 				TestingFailingHAJobManager.class,
 				flinkConfiguration,
@@ -360,7 +363,8 @@ public class JobManagerHARecoveryTest extends TestLogger {
 				TestingUtils.defaultExecutor(),
 				mock(InstanceManager.class),
 				mock(Scheduler.class),
-				new BlobLibraryCacheManager(mock(BlobService.class), 1 << 20),
+				blobServer,
+				new BlobLibraryCacheManager(blobServer),
 				ActorRef.noSender(),
 				new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(Int.MaxValue(), 100),
 				timeout,
@@ -397,6 +401,7 @@ public class JobManagerHARecoveryTest extends TestLogger {
 			Executor ioExecutor,
 			InstanceManager instanceManager,
 			Scheduler scheduler,
+			BlobServer blobServer,
 			BlobLibraryCacheManager libraryCacheManager,
 			ActorRef archive,
 			RestartStrategyFactory restartStrategyFactory,
@@ -413,6 +418,7 @@ public class JobManagerHARecoveryTest extends TestLogger {
 				ioExecutor,
 				instanceManager,
 				scheduler,
+				blobServer,
 				libraryCacheManager,
 				archive,
 				restartStrategyFactory,

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
index 3c75971..6a39293 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
@@ -137,14 +137,13 @@ public class JobSubmitTest {
 			// upload two dummy bytes and add their keys to the job graph as dependencies
 			BlobKey key1, key2;
 			BlobClient bc = new BlobClient(new InetSocketAddress("localhost", blobPort), jmConfig);
-			// TODO: make use of job-related BLOBs after adapting the BlobLibraryCacheManager
-			JobID jobId = null;
+			JobID jobId = jg.getJobID();
 			try {
 				key1 = bc.put(jobId, new byte[10]);
 				key2 = bc.put(jobId, new byte[10]);
 
 				// delete one of the blobs to make sure that the startup failed
-				bc.delete(key2);
+				bc.delete(jobId, key2);
 			}
 			finally {
 				bc.close();

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index df35369..2c17b5a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmaster;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
@@ -92,8 +93,8 @@ public class JobMasterTest extends TestLogger {
 
 		final ScheduledExecutor scheduledExecutor = mock(ScheduledExecutor.class);
 		final HeartbeatServices heartbeatServices = new TestingHeartbeatServices(heartbeatInterval, heartbeatTimeout, scheduledExecutor);
-		final BlobLibraryCacheManager libraryCacheManager = mock(BlobLibraryCacheManager.class);
-		when(libraryCacheManager.getBlobServerPort()).thenReturn(1337);
+		BlobServer blobServer = mock(BlobServer.class);
+		when(blobServer.getPort()).thenReturn(1337);
 
 		final JobGraph jobGraph = new JobGraph();
 
@@ -106,7 +107,8 @@ public class JobMasterTest extends TestLogger {
 				haServices,
 				heartbeatServices,
 				Executors.newScheduledThreadPool(1),
-				libraryCacheManager,
+				blobServer,
+				mock(BlobLibraryCacheManager.class),
 				mock(RestartStrategyFactory.class),
 				Time.of(10, TimeUnit.SECONDS),
 				null,
@@ -204,6 +206,7 @@ public class JobMasterTest extends TestLogger {
 				haServices,
 				heartbeatServices,
 				Executors.newScheduledThreadPool(1),
+				mock(BlobServer.class),
 				mock(BlobLibraryCacheManager.class),
 				mock(RestartStrategyFactory.class),
 				Time.of(10, TimeUnit.SECONDS),

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
index 70800e5..230ca91 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
@@ -25,9 +25,10 @@ import akka.actor.Props;
 import akka.pattern.Patterns;
 import akka.testkit.JavaTestKit;
 import akka.util.Timeout;
-
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.test.TestingServer;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobServer;
@@ -47,13 +48,11 @@ import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.util.TestLogger;
-
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
-
 import scala.Option;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
@@ -178,6 +177,9 @@ public class JobManagerLeaderElectionTest extends TestLogger {
 		SubmittedJobGraphStore submittedJobGraphStore = new StandaloneSubmittedJobGraphStore();
 		CheckpointRecoveryFactory checkpointRecoveryFactory = new StandaloneCheckpointRecoveryFactory();
 
+		configuration.setLong(BlobServerOptions.CLEANUP_INTERVAL, 1L);
+
+		BlobServer blobServer = new BlobServer(configuration, new VoidBlobStore());
 		return Props.create(
 			TestingJobManager.class,
 			configuration,
@@ -185,7 +187,8 @@ public class JobManagerLeaderElectionTest extends TestLogger {
 			TestingUtils.defaultExecutor(),
 			new InstanceManager(),
 			new Scheduler(TestingUtils.defaultExecutionContext()),
-			new BlobLibraryCacheManager(new BlobServer(configuration, new VoidBlobStore()), 10L),
+			blobServer,
+			new BlobLibraryCacheManager(blobServer),
 			ActorRef.noSender(),
 			new NoRestartStrategy.NoRestartStrategyFactory(),
 			AkkaUtils.getDefaultTimeoutAsFiniteDuration(),

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 43ff60b..6842bee 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
@@ -668,7 +669,7 @@ public class TaskExecutorTest extends TestLogger {
 				Collections.<InputGateDeploymentDescriptor>emptyList());
 
 		final LibraryCacheManager libraryCacheManager = mock(LibraryCacheManager.class);
-		when(libraryCacheManager.getClassLoader(eq(jobId))).thenReturn(getClass().getClassLoader());
+		when(libraryCacheManager.getClassLoader(any(JobID.class))).thenReturn(ClassLoader.getSystemClassLoader());
 
 		final JobManagerConnection jobManagerConnection = new JobManagerConnection(
 			jobId,
@@ -677,6 +678,7 @@ public class TaskExecutorTest extends TestLogger {
 			jobManagerLeaderId,
 			mock(TaskManagerActions.class),
 			mock(CheckpointResponder.class),
+			mock(BlobCache.class),
 			libraryCacheManager,
 			mock(ResultPartitionConsumableNotifier.class),
 			mock(PartitionProducerStateChecker.class));
@@ -1191,6 +1193,7 @@ public class TaskExecutorTest extends TestLogger {
 			jobManagerLeaderId,
 			mock(TaskManagerActions.class),
 			mock(CheckpointResponder.class),
+			mock(BlobCache.class),
 			libraryCacheManager,
 			mock(ResultPartitionConsumableNotifier.class),
 			mock(PartitionProducerStateChecker.class));

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 085a386..392dc29 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
@@ -145,6 +146,7 @@ public class TaskAsyncCallTest {
 	}
 	
 	private static Task createTask() throws Exception {
+		BlobCache blobCache = mock(BlobCache.class);
 		LibraryCacheManager libCache = mock(LibraryCacheManager.class);
 		when(libCache.getClassLoader(any(JobID.class))).thenReturn(ClassLoader.getSystemClassLoader());
 		
@@ -195,6 +197,7 @@ public class TaskAsyncCallTest {
 			mock(TaskManagerActions.class),
 			mock(InputSplitProvider.class),
 			mock(CheckpointResponder.class),
+			blobCache,
 			libCache,
 			mock(FileCache.class),
 			new TestingTaskManagerRuntimeInfo(),

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
index 1ebd4ad..ac0df36 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.taskmanager;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
@@ -98,6 +99,7 @@ public class TaskStopTest {
 			mock(TaskManagerActions.class),
 			mock(InputSplitProvider.class),
 			mock(CheckpointResponder.class),
+			mock(BlobCache.class),
 			mock(LibraryCacheManager.class),
 			mock(FileCache.class),
 			tmRuntimeInfo,

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index ba3e820..d4cd0cf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
@@ -227,7 +228,8 @@ public class TaskTest extends TestLogger {
 	@Test
 	public void testLibraryCacheRegistrationFailed() {
 		try {
-			Task task = createTask(TestInvokableCorrect.class, mock(LibraryCacheManager.class));
+			Task task = createTask(TestInvokableCorrect.class, mock(BlobCache.class),
+				mock(LibraryCacheManager.class));
 
 			// task should be new and perfect
 			assertEquals(ExecutionState.CREATED, task.getExecutionState());
@@ -260,6 +262,7 @@ public class TaskTest extends TestLogger {
 	@Test
 	public void testExecutionFailsInNetworkRegistration() {
 		try {
+			BlobCache blobCache = mock(BlobCache.class);
 			// mock a working library cache
 			LibraryCacheManager libCache = mock(LibraryCacheManager.class);
 			when(libCache.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader());
@@ -274,7 +277,7 @@ public class TaskTest extends TestLogger {
 			when(network.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC);
 			doThrow(new RuntimeException("buffers")).when(network).registerTask(any(Task.class));
 
-			Task task = createTask(TestInvokableCorrect.class, libCache, network, consumableNotifier, partitionProducerStateChecker, executor);
+			Task task = createTask(TestInvokableCorrect.class, blobCache, libCache, network, consumableNotifier, partitionProducerStateChecker, executor);
 
 			task.registerExecutionListener(listener);
 
@@ -617,6 +620,7 @@ public class TaskTest extends TestLogger {
 		IntermediateDataSetID resultId = new IntermediateDataSetID();
 		ResultPartitionID partitionId = new ResultPartitionID();
 
+		BlobCache blobCache = mock(BlobCache.class);
 		LibraryCacheManager libCache = mock(LibraryCacheManager.class);
 		when(libCache.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader());
 
@@ -629,7 +633,7 @@ public class TaskTest extends TestLogger {
 		when(network.createKvStateTaskRegistry(any(JobID.class), any(JobVertexID.class)))
 			.thenReturn(mock(TaskKvStateRegistry.class));
 
-		createTask(InvokableBlockingInInvoke.class, libCache, network, consumableNotifier, partitionChecker, Executors.directExecutor());
+		createTask(InvokableBlockingInInvoke.class, blobCache, libCache, network, consumableNotifier, partitionChecker, Executors.directExecutor());
 
 		// Test all branches of trigger partition state check
 
@@ -638,7 +642,7 @@ public class TaskTest extends TestLogger {
 			createQueuesAndActors();
 
 			// PartitionProducerDisposedException
-			Task task = createTask(InvokableBlockingInInvoke.class, libCache, network, consumableNotifier, partitionChecker, Executors.directExecutor());
+			Task task = createTask(InvokableBlockingInInvoke.class, blobCache, libCache, network, consumableNotifier, partitionChecker, Executors.directExecutor());
 
 			CompletableFuture<ExecutionState> promise = new CompletableFuture<>();
 			when(partitionChecker.requestPartitionProducerState(eq(task.getJobID()), eq(resultId), eq(partitionId))).thenReturn(promise);
@@ -654,7 +658,7 @@ public class TaskTest extends TestLogger {
 			createQueuesAndActors();
 
 			// Any other exception
-			Task task = createTask(InvokableBlockingInInvoke.class, libCache, network, consumableNotifier, partitionChecker, Executors.directExecutor());
+			Task task = createTask(InvokableBlockingInInvoke.class, blobCache, libCache, network, consumableNotifier, partitionChecker, Executors.directExecutor());
 
 			CompletableFuture<ExecutionState> promise = new CompletableFuture<>();
 			when(partitionChecker.requestPartitionProducerState(eq(task.getJobID()), eq(resultId), eq(partitionId))).thenReturn(promise);
@@ -671,7 +675,7 @@ public class TaskTest extends TestLogger {
 			createQueuesAndActors();
 
 			// TimeoutException handled special => retry
-			Task task = createTask(InvokableBlockingInInvoke.class, libCache, network, consumableNotifier, partitionChecker, Executors.directExecutor());
+			Task task = createTask(InvokableBlockingInInvoke.class, blobCache, libCache, network, consumableNotifier, partitionChecker, Executors.directExecutor());
 			SingleInputGate inputGate = mock(SingleInputGate.class);
 			when(inputGate.getConsumedResultId()).thenReturn(resultId);
 
@@ -702,7 +706,7 @@ public class TaskTest extends TestLogger {
 			createQueuesAndActors();
 
 			// Success
-			Task task = createTask(InvokableBlockingInInvoke.class, libCache, network, consumableNotifier, partitionChecker, Executors.directExecutor());
+			Task task = createTask(InvokableBlockingInInvoke.class, blobCache, libCache, network, consumableNotifier, partitionChecker, Executors.directExecutor());
 			SingleInputGate inputGate = mock(SingleInputGate.class);
 			when(inputGate.getConsumedResultId()).thenReturn(resultId);
 
@@ -882,26 +886,30 @@ public class TaskTest extends TestLogger {
 	}
 
 	private Task createTask(Class<? extends AbstractInvokable> invokable, Configuration config) throws IOException {
+		BlobCache blobCache = mock(BlobCache.class);
 		LibraryCacheManager libCache = mock(LibraryCacheManager.class);
 		when(libCache.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader());
-		return createTask(invokable, libCache, config, new ExecutionConfig());
+		return createTask(invokable, blobCache,libCache, config, new ExecutionConfig());
 	}
 
 	private Task createTask(Class<? extends AbstractInvokable> invokable, Configuration config, ExecutionConfig execConfig) throws IOException {
+		BlobCache blobCache = mock(BlobCache.class);
 		LibraryCacheManager libCache = mock(LibraryCacheManager.class);
 		when(libCache.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader());
-		return createTask(invokable, libCache, config, execConfig);
+		return createTask(invokable, blobCache,libCache, config, execConfig);
 	}
 
 	private Task createTask(
 			Class<? extends AbstractInvokable> invokable,
+			BlobCache blobCache,
 			LibraryCacheManager libCache) throws IOException {
 
-		return createTask(invokable, libCache, new Configuration(), new ExecutionConfig());
+		return createTask(invokable, blobCache,libCache, new Configuration(), new ExecutionConfig());
 	}
 
 	private Task createTask(
 			Class<? extends AbstractInvokable> invokable,
+			BlobCache blobCache,
 			LibraryCacheManager libCache,
 			Configuration config,
 			ExecutionConfig execConfig) throws IOException {
@@ -916,21 +924,23 @@ public class TaskTest extends TestLogger {
 		when(network.createKvStateTaskRegistry(any(JobID.class), any(JobVertexID.class)))
 				.thenReturn(mock(TaskKvStateRegistry.class));
 
-		return createTask(invokable, libCache, network, consumableNotifier, partitionProducerStateChecker, executor, config, execConfig);
+		return createTask(invokable, blobCache, libCache, network, consumableNotifier, partitionProducerStateChecker, executor, config, execConfig);
 	}
 
 	private Task createTask(
 			Class<? extends AbstractInvokable> invokable,
+			BlobCache blobCache,
 			LibraryCacheManager libCache,
 			NetworkEnvironment networkEnvironment,
 			ResultPartitionConsumableNotifier consumableNotifier,
 			PartitionProducerStateChecker partitionProducerStateChecker,
 			Executor executor) throws IOException {
-		return createTask(invokable, libCache, networkEnvironment, consumableNotifier, partitionProducerStateChecker, executor, new Configuration(), new ExecutionConfig());
+		return createTask(invokable, blobCache, libCache, networkEnvironment, consumableNotifier, partitionProducerStateChecker, executor, new Configuration(), new ExecutionConfig());
 	}
 	
 	private Task createTask(
 		Class<? extends AbstractInvokable> invokable,
+		BlobCache blobCache,
 		LibraryCacheManager libCache,
 		NetworkEnvironment networkEnvironment,
 		ResultPartitionConsumableNotifier consumableNotifier,
@@ -991,6 +1001,7 @@ public class TaskTest extends TestLogger {
 			taskManagerConnection,
 			inputSplitProvider,
 			checkpointResponder,
+			blobCache,
 			libCache,
 			mock(FileCache.class),
 			new TestingTaskManagerRuntimeInfo(taskManagerConfig),

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/FailingBlockingInvokable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/FailingBlockingInvokable.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/FailingBlockingInvokable.java
new file mode 100644
index 0000000..37c141d
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/FailingBlockingInvokable.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testtasks;
+
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+
+/**
+ * Task which blocks until the (static) {@link #unblock()} method is called and then fails with an
+ * exception.
+ */
+public class FailingBlockingInvokable extends AbstractInvokable {
+	private static volatile boolean blocking = true;
+	private static final Object lock = new Object();
+
+	@Override
+	public void invoke() throws Exception {
+		while (blocking) {
+			synchronized (lock) {
+				lock.wait();
+			}
+		}
+		throw new RuntimeException("This exception is expected.");
+	}
+
+	public static void unblock() {
+		blocking = false;
+
+		synchronized (lock) {
+			lock.notifyAll();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
index c1df5a3..229f1eb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
@@ -178,6 +179,7 @@ public class JvmExitOnFatalErrorTest {
 						new NoOpTaskManagerActions(),
 						new NoOpInputSplitProvider(),
 						new NoOpCheckpointResponder(),
+						mock(BlobCache.class),
 						new FallbackLibraryCacheManager(),
 						new FileCache(tmInfo.getTmpDirectories()),
 						tmInfo,

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
index 1b9ee48..95da981 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
@@ -264,14 +264,15 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll with Befor
       components._1,
       components._2,
       components._3,
-      ActorRef.noSender,
       components._4,
+      ActorRef.noSender,
       components._5,
+      components._6,
       highAvailabilityServices.getJobManagerLeaderElectionService(
         HighAvailabilityServices.DEFAULT_JOB_ID),
       highAvailabilityServices.getSubmittedJobGraphStore(),
       highAvailabilityServices.getCheckpointRecoveryFactory(),
-      components._8,
+      components._9,
       None)
 
     _system.actorOf(props)

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index e5655bb..87f8088 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -28,6 +28,7 @@ import akka.testkit.CallingThreadDispatcher
 import org.apache.flink.api.common.JobID
 import org.apache.flink.configuration.{Configuration, JobManagerOptions}
 import org.apache.flink.runtime.akka.AkkaUtils
+import org.apache.flink.runtime.blob.BlobServer
 import org.apache.flink.runtime.checkpoint.savepoint.Savepoint
 import org.apache.flink.runtime.checkpoint.{CheckpointOptions, CheckpointRecoveryFactory}
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager
@@ -110,6 +111,7 @@ class TestingCluster(
     ioExecutor: Executor,
     instanceManager: InstanceManager,
     scheduler: Scheduler,
+    blobServer: BlobServer,
     libraryCacheManager: BlobLibraryCacheManager,
     archive: ActorRef,
     restartStrategyFactory: RestartStrategyFactory,
@@ -127,6 +129,7 @@ class TestingCluster(
       ioExecutor,
       instanceManager,
       scheduler,
+      blobServer,
       libraryCacheManager,
       archive,
       restartStrategyFactory,

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
index f50a832..8b9ce15 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.{Executor, ScheduledExecutorService}
 
 import akka.actor.ActorRef
 import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.blob.BlobServer
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
@@ -34,15 +35,16 @@ import org.apache.flink.runtime.metrics.MetricRegistry
 import scala.concurrent.duration._
 import scala.language.postfixOps
 
-/** JobManager implementation extended by testing messages
-  *
-  */
+/**
+ * JobManager implementation extended by testing messages
+ */
 class TestingJobManager(
     flinkConfiguration: Configuration,
     futureExecutor: ScheduledExecutorService,
     ioExecutor: Executor,
     instanceManager: InstanceManager,
     scheduler: Scheduler,
+    blobServer: BlobServer,
     libraryCacheManager: BlobLibraryCacheManager,
     archive: ActorRef,
     restartStrategyFactory: RestartStrategyFactory,
@@ -58,6 +60,7 @@ class TestingJobManager(
     ioExecutor,
     instanceManager,
     scheduler,
+    blobServer,
     libraryCacheManager,
     archive,
     restartStrategyFactory,

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
index 3b8178b..82642ea 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
@@ -156,6 +157,7 @@ public class BlockingCheckpointsTest {
 				mock(TaskManagerActions.class),
 				mock(InputSplitProvider.class),
 				mock(CheckpointResponder.class),
+				mock(BlobCache.class),
 				new FallbackLibraryCacheManager(),
 				new FileCache(new String[] { EnvironmentInformation.getTemporaryFileDirectory() }),
 				new TestingTaskManagerRuntimeInfo(),

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
index 82e4f31..14ae733 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
@@ -274,6 +275,7 @@ public class InterruptSensitiveRestoreTest {
 			mock(TaskManagerActions.class),
 			mock(InputSplitProvider.class),
 			mock(CheckpointResponder.class),
+			mock(BlobCache.class),
 			new FallbackLibraryCacheManager(),
 			new FileCache(new String[] { EnvironmentInformation.getTemporaryFileDirectory() }),
 			new TestingTaskManagerRuntimeInfo(),

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
index 702d833..79e9583 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
@@ -153,6 +154,7 @@ public class StreamTaskTerminationTest extends TestLogger {
 			mock(TaskManagerActions.class),
 			mock(InputSplitProvider.class),
 			mock(CheckpointResponder.class),
+			mock(BlobCache.class),
 			new FallbackLibraryCacheManager(),
 			mock(FileCache.class),
 			taskManagerRuntimeInfo,

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 09e9a1b..08c3207 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
@@ -796,6 +797,7 @@ public class StreamTaskTest extends TestLogger {
 			StreamConfig taskConfig,
 			Configuration taskManagerConfig) throws Exception {
 
+		BlobCache blobCache = mock(BlobCache.class);
 		LibraryCacheManager libCache = mock(LibraryCacheManager.class);
 		when(libCache.getClassLoader(any(JobID.class))).thenReturn(StreamTaskTest.class.getClassLoader());
 
@@ -844,6 +846,7 @@ public class StreamTaskTest extends TestLogger {
 			mock(TaskManagerActions.class),
 			mock(InputSplitProvider.class),
 			mock(CheckpointResponder.class),
+			blobCache,
 			libCache,
 			mock(FileCache.class),
 			new TestingTaskManagerRuntimeInfo(taskManagerConfig, new String[] {System.getProperty("java.io.tmpdir")}),

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
index b539961..bd72d6d 100644
--- a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
+++ b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.{Executor, ScheduledExecutorService}
 
 import akka.actor.ActorRef
 import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.blob.BlobServer
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
@@ -58,6 +59,7 @@ class TestingYarnJobManager(
     ioExecutor: Executor,
     instanceManager: InstanceManager,
     scheduler: Scheduler,
+    blobServer: BlobServer,
     libraryCacheManager: BlobLibraryCacheManager,
     archive: ActorRef,
     restartStrategyFactory: RestartStrategyFactory,
@@ -73,6 +75,7 @@ class TestingYarnJobManager(
     ioExecutor,
     instanceManager,
     scheduler,
+    blobServer,
     libraryCacheManager,
     archive,
     restartStrategyFactory,

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
index a2d1668..b8dacee 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
@@ -24,6 +24,7 @@ import java.util.concurrent.{Executor, ScheduledExecutorService, TimeUnit}
 import akka.actor.ActorRef
 import org.apache.flink.configuration.{Configuration => FlinkConfiguration}
 import org.apache.flink.core.fs.Path
+import org.apache.flink.runtime.blob.BlobServer
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
 import org.apache.flink.runtime.clusterframework.ContaineredJobManager
 import org.apache.flink.runtime.clusterframework.messages.StopCluster
@@ -49,7 +50,8 @@ import scala.language.postfixOps
   * @param instanceManager Instance manager to manage the registered
   *                        [[org.apache.flink.runtime.taskmanager.TaskManager]]
   * @param scheduler Scheduler to schedule Flink jobs
-  * @param libraryCacheManager Manager to manage uploaded jar files
+  * @param blobServer BLOB store for file uploads
+  * @param libraryCacheManager manages uploaded jar files and class paths
   * @param archive Archive for finished Flink jobs
   * @param restartStrategyFactory Restart strategy to be used in case of a job recovery
   * @param timeout Timeout for futures
@@ -61,6 +63,7 @@ class YarnJobManager(
     ioExecutor: Executor,
     instanceManager: InstanceManager,
     scheduler: FlinkScheduler,
+    blobServer: BlobServer,
     libraryCacheManager: BlobLibraryCacheManager,
     archive: ActorRef,
     restartStrategyFactory: RestartStrategyFactory,
@@ -76,6 +79,7 @@ class YarnJobManager(
     ioExecutor,
     instanceManager,
     scheduler,
+    blobServer,
     libraryCacheManager,
     archive,
     restartStrategyFactory,


[2/5] flink git commit: [hotfix] increase Scala checkstyle maxParameters to 20

Posted by tr...@apache.org.
[hotfix] increase Scala checkstyle maxParameters to 20


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9c80d407
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9c80d407
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9c80d407

Branch: refs/heads/master
Commit: 9c80d407c6b31cf449572ad8cc90da0f15fb2b16
Parents: d0a1506
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Tue Jul 4 11:50:07 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Aug 18 09:29:31 2017 +0200

----------------------------------------------------------------------
 tools/maven/scalastyle-config.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9c80d407/tools/maven/scalastyle-config.xml
----------------------------------------------------------------------
diff --git a/tools/maven/scalastyle-config.xml b/tools/maven/scalastyle-config.xml
index 0f7f6bb..848b2af 100644
--- a/tools/maven/scalastyle-config.xml
+++ b/tools/maven/scalastyle-config.xml
@@ -86,7 +86,7 @@
  <!-- </check> -->
  <check level="error" class="org.scalastyle.scalariform.ParameterNumberChecker" enabled="true">
   <parameters>
-   <parameter name="maxParameters"><![CDATA[15]]></parameter>
+   <parameter name="maxParameters"><![CDATA[20]]></parameter>
   </parameters>
  </check>
  <!-- <check level="error" class="org.scalastyle.scalariform.MagicNumberChecker" enabled="true"> -->


[5/5] flink git commit: [FLINK-7057][blob] move ref-counting from the LibraryCacheManager to the BlobCache

Posted by tr...@apache.org.
[FLINK-7057][blob] move ref-counting from the LibraryCacheManager to the BlobCache

Also change from BlobKey-based ref-counting to job-based ref-counting which is
simpler and the mode we want to use from now on. Deferred cleanup (as before)
is currently not implemented yet (TODO).
At the BlobServer, no ref-counting will be used but the cleanup will happen
when the job enters a final state (TODO).

[FLINK-7057][blob] change to a cleaner API for BlobService#registerJob()

[FLINK-7057][blob] implement deferred cleanup at the BlobCache

Whenever a job is not referenced at the BlobCache anymore, we set a TTL and let
the cleanup task remove it when this is hit and the task is run. For now, this
means that a BLOB will be retained at most
(2 * ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) seconds after not
being referenced anymore. We do this so that a recovery still has the chance to
use existing files rather than to download them again.

[FLINK-7057][blob] integrate cleanup of job-related JARs from the BlobServer

TODO: an integration test that verifies that this is actually done when desired
and not performed when not, e.g. if the job did not reach a final execution
state

[FLINK-7057][tests] extract FailingBlockingInvokable from CoordinatorShutdownTest

[FLINK-7057][blob] add an integration test for the BlobServer cleanup

This ensures that BLOB files are actually deleted when a job enters a final
state.

[FLINK-7057][tests] refrain from catching an exception just to fail the test

removes code like this in the BLOB store unit tests:

catch (Exception e) {
    e.printStackTrace();
    fail(e.getMessage());
}

[FLINK-7057][blob] fix BlobServer#cleanupJob() being too eager

Instead of deleting the job's directory, it was deleting the parent storage
directory.

[FLINK-7057][blob] fix BlobServer cleanup integration

* the test did not check the correct directories for cleanup
* the test did not honour the test timeout

[FLINK-7057][blob] test and fix BlobServer cleanup for a failed job submission

[FLINK-7057][blob] rework the LibraryCacheManager API

Since ref-counting has moved to the BlobCache, the BlobLibraryCacheManager is
just a thin wrapper to get a user class loader by retrieving BLOBs from the
BlobCache/BlobServer. Therefore, move the job-registration/-release out of it,
too, and restrict its use to the task manager where the BlobCache is used (on
the BlobServer, jobs do not need registration since they are only used once and
will be deleted when they enter a final state).

This makes the BlobServer and BlobCache instances available at the JobManager
and TaskManager instances, respectively, also enabling future use cases outside
of the LibraryCacheManager.

[FLINK-7057][blob] address PR comments

[FLINK-7057][blob] fix JobManagerLeaderElectionTest

[FLINK-7057][blob] re-introduce some ref-counting for BlobLibraryCacheManager

Apparently, we do need to return the same ClassLoader for different (parallel)
tasks of a job running on the same task manager. Therefore, keep the initial
task registration implementation that was removed with
8331fbb208d975e0c1ec990344c14315ea08dd4a and only adapt it here. This also
restores some tests and adds new combinations not tested before.

[FLINK-7057][blob] address PR comments

[FLINK-7057][tests] fix (manual/ignored) BlobCacheCleanupTest#testJobDeferredCleanup()

[FLINK-7057][hotfix] fix a checkstyle error

[FLINK-7057][blob] remove the extra lock object from BlobCache

We can lock on jobRefCounters instead, which is what we are guarding anyway.

[FLINK-7057][blob] minor improvements to the TTL in BlobCache

Do not use Long.MAX_VALUE as a code for "keep forever". Also add more comments.

[FLINK-7057][blob] replace "library-cache-manager.cleanup.interval" with "blob.service.cleanup.interval"

Since we moved the cleanup to the BLOB service classes, this only makes sense.

[FLINK-7057][hotfix] remove an unused import

[FLINK-7057][docs] adapt javadocs of JobManager descendents

[FLINK-7057][blob] increase JobManagerCleanupITCase timeout

The previous value of 15s seems to be too low for some runs on Travis.

[FLINK-7057][blob] providing more debug output in JobManagerCleanupITCase

In case the BlobServer's directory is not cleaned within the remaining time,
also print which files remain. This may help debugging the situation.

This closes #4238.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7b236240
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7b236240
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7b236240

Branch: refs/heads/master
Commit: 7b23624066c46d58c7b7181e5576a9834af9ac7a
Parents: 9c80d40
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Tue Jun 27 18:29:44 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Aug 18 09:29:32 2017 +0200

----------------------------------------------------------------------
 docs/ops/config.md                              |   7 +
 .../flink/configuration/BlobServerOptions.java  |  16 +-
 .../flink/configuration/ConfigConstants.java    |   9 +-
 .../clusterframework/MesosJobManager.scala      |   8 +-
 .../apache/flink/runtime/blob/BlobCache.java    | 140 ++++-
 .../apache/flink/runtime/blob/BlobClient.java   |  21 +-
 .../apache/flink/runtime/blob/BlobServer.java   |  40 +-
 .../runtime/blob/BlobServerConnection.java      |  18 +-
 .../apache/flink/runtime/blob/BlobService.java  |   5 +-
 .../apache/flink/runtime/blob/BlobUtils.java    |  25 +-
 .../apache/flink/runtime/client/JobClient.java  |   3 +-
 .../flink/runtime/dispatcher/Dispatcher.java    |   3 +-
 .../dispatcher/StandaloneDispatcher.java        |   5 +-
 .../entrypoint/JobClusterEntrypoint.java        |   3 +-
 .../librarycache/BlobLibraryCacheManager.java   | 327 ++++++-----
 .../FallbackLibraryCacheManager.java            |   8 +-
 .../librarycache/LibraryCacheManager.java       |  25 +-
 .../apache/flink/runtime/jobgraph/JobGraph.java |   2 +-
 .../runtime/jobmaster/JobManagerRunner.java     |   7 +-
 .../runtime/jobmaster/JobManagerServices.java   |  20 +-
 .../flink/runtime/jobmaster/JobMaster.java      |  14 +-
 .../taskexecutor/JobManagerConnection.java      |  32 +-
 .../runtime/taskexecutor/TaskExecutor.java      |  11 +-
 .../taskexecutor/TaskManagerConfiguration.java  |  12 +-
 .../apache/flink/runtime/taskmanager/Task.java  |  13 +-
 .../ContaineredJobManager.scala                 |   6 +-
 .../flink/runtime/jobmanager/JobManager.scala   |  48 +-
 .../minicluster/LocalFlinkMiniCluster.scala     |   5 +
 .../flink/runtime/taskmanager/TaskManager.scala |  34 +-
 .../runtime/blob/BlobCacheCleanupTest.java      | 328 +++++++++++
 .../runtime/blob/BlobCacheRetriesTest.java      |   4 +-
 .../flink/runtime/blob/BlobClientTest.java      |  29 +-
 .../apache/flink/runtime/blob/BlobKeyTest.java  |   6 +-
 .../runtime/blob/BlobServerDeleteTest.java      |  85 ++-
 .../flink/runtime/blob/BlobUtilsTest.java       |   3 +-
 .../checkpoint/CoordinatorShutdownTest.java     |  23 +-
 .../runtime/dispatcher/DispatcherTest.java      |   3 +-
 .../BlobLibraryCacheManagerTest.java            | 540 +++++++++++++------
 .../BlobLibraryCacheRecoveryITCase.java         |  36 +-
 .../jobmanager/JobManagerCleanupITCase.java     | 300 +++++++++++
 .../jobmanager/JobManagerHARecoveryTest.java    |  20 +-
 .../flink/runtime/jobmanager/JobSubmitTest.java |   5 +-
 .../flink/runtime/jobmaster/JobMasterTest.java  |   9 +-
 .../JobManagerLeaderElectionTest.java           |  11 +-
 .../runtime/taskexecutor/TaskExecutorTest.java  |   5 +-
 .../runtime/taskmanager/TaskAsyncCallTest.java  |   3 +
 .../flink/runtime/taskmanager/TaskStopTest.java |   2 +
 .../flink/runtime/taskmanager/TaskTest.java     |  35 +-
 .../testtasks/FailingBlockingInvokable.java     |  48 ++
 .../runtime/util/JvmExitOnFatalErrorTest.java   |   2 +
 .../jobmanager/JobManagerRegistrationTest.scala |   5 +-
 .../runtime/testingUtils/TestingCluster.scala   |   3 +
 .../testingUtils/TestingJobManager.scala        |   9 +-
 .../runtime/tasks/BlockingCheckpointsTest.java  |   2 +
 .../tasks/InterruptSensitiveRestoreTest.java    |   2 +
 .../tasks/StreamTaskTerminationTest.java        |   2 +
 .../streaming/runtime/tasks/StreamTaskTest.java |   3 +
 .../flink/yarn/TestingYarnJobManager.scala      |   3 +
 .../org/apache/flink/yarn/YarnJobManager.scala  |   6 +-
 59 files changed, 1759 insertions(+), 640 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/docs/ops/config.md
----------------------------------------------------------------------
diff --git a/docs/ops/config.md b/docs/ops/config.md
index 4138b4d..e0b9d4d 100644
--- a/docs/ops/config.md
+++ b/docs/ops/config.md
@@ -196,6 +196,13 @@ will be used under the directory specified by jobmanager.web.tmpdir.
 
 - `blob.storage.directory`: Directory for storing blobs (such as user JARs) on the TaskManagers.
 
+- `blob.service.cleanup.interval`: Cleanup interval (in seconds) of the blob caches (DEFAULT: 1 hour).
+Whenever a job is not referenced at the cache anymore, we set a TTL and let the periodic cleanup task
+(executed every `blob.service.cleanup.interval` seconds) remove its blob files after this TTL has passed.
+This means that a blob will be retained at most <tt>2 * `blob.service.cleanup.interval`</tt> seconds after
+not being referenced anymore. Therefore, a recovery still has the chance to use existing files rather
+than to download them again.
+
 - `blob.server.port`: Port definition for the blob server (serving user JARs) on the TaskManagers. By default the port is set to 0, which means that the operating system is picking an ephemeral port. Flink also accepts a list of ports ("50100,50101"), ranges ("50100-50200") or a combination of both. It is recommended to set a range of ports to avoid collisions when multiple JobManagers are running on the same machine.
 
 - `blob.service.ssl.enabled`: Flag to enable ssl for the blob client/server communication. This is applicable only when the global ssl flag security.ssl.enabled is set to true (DEFAULT: true).

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java
index e27c29f..019580a 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java
@@ -22,7 +22,7 @@ import org.apache.flink.annotation.PublicEvolving;
 import static org.apache.flink.configuration.ConfigOptions.key;
 
 /**
- * Configuration options for the BlobServer.
+ * Configuration options for the BlobServer and BlobCache.
  */
 @PublicEvolving
 public class BlobServerOptions {
@@ -73,4 +73,18 @@ public class BlobServerOptions {
 	public static final ConfigOption<Boolean> SSL_ENABLED =
 		key("blob.service.ssl.enabled")
 			.defaultValue(true);
+
+	/**
+	 * Cleanup interval of the blob caches at the task managers (in seconds).
+	 *
+	 * <p>Whenever a job is not referenced at the cache anymore, we set a TTL and let the periodic
+	 * cleanup task (executed every CLEANUP_INTERVAL seconds) remove its blob files after this TTL
+	 * has passed. This means that a blob will be retained at most <tt>2 * CLEANUP_INTERVAL</tt>
+	 * seconds after not being referenced anymore. Therefore, a recovery still has the chance to use
+	 * existing files rather than to download them again.
+	 */
+	public static final ConfigOption<Long> CLEANUP_INTERVAL =
+		key("blob.service.cleanup.interval")
+			.defaultValue(3_600L) // once per hour
+			.withDeprecatedKeys("library-cache-manager.cleanup.interval");
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 4c6c62a..4153e45 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -178,7 +178,10 @@ public final class ConfigConstants {
 
 	/**
 	 * The config parameter defining the cleanup interval of the library cache manager.
+	 *
+	 * @deprecated use {@link BlobServerOptions#CLEANUP_INTERVAL} instead
 	 */
+	@Deprecated
 	public static final String LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL = "library-cache-manager.cleanup.interval";
 
 	/**
@@ -1253,8 +1256,12 @@ public final class ConfigConstants {
 
 	/**
 	 * The default library cache manager cleanup interval in seconds
+	 *
+	 * @deprecated use {@link BlobServerOptions#CLEANUP_INTERVAL} instead
 	 */
-	public static final long DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL = 3600;
+	@Deprecated
+	public static final long DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL =
+		BlobServerOptions.CLEANUP_INTERVAL.defaultValue();
 	
 	/**
 	 * The default network port to connect to for communication with the job manager.

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala b/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala
index 3e7c55f..f854a1e 100644
--- a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala
+++ b/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.{Executor, ScheduledExecutorService}
 
 import akka.actor.ActorRef
 import org.apache.flink.configuration.{Configuration => FlinkConfiguration}
+import org.apache.flink.runtime.blob.BlobServer
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
 import org.apache.flink.runtime.clusterframework.ContaineredJobManager
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
@@ -34,7 +35,7 @@ import org.apache.flink.runtime.metrics.{MetricRegistry => FlinkMetricRegistry}
 
 import scala.concurrent.duration._
 
-/** JobManager actor for execution on Mesos. .
+/** JobManager actor for execution on Mesos.
   *
   * @param flinkConfiguration Configuration object for the actor
   * @param futureExecutor Execution context which is used to execute concurrent tasks in the
@@ -43,7 +44,8 @@ import scala.concurrent.duration._
   * @param instanceManager Instance manager to manage the registered
   *                        [[org.apache.flink.runtime.taskmanager.TaskManager]]
   * @param scheduler Scheduler to schedule Flink jobs
-  * @param libraryCacheManager Manager to manage uploaded jar files
+  * @param blobServer BLOB store for file uploads
+  * @param libraryCacheManager manages uploaded jar files and class paths
   * @param archive Archive for finished Flink jobs
   * @param restartStrategyFactory Restart strategy to be used in case of a job recovery
   * @param timeout Timeout for futures
@@ -55,6 +57,7 @@ class MesosJobManager(
     ioExecutor: Executor,
     instanceManager: InstanceManager,
     scheduler: FlinkScheduler,
+    blobServer: BlobServer,
     libraryCacheManager: BlobLibraryCacheManager,
     archive: ActorRef,
     restartStrategyFactory: RestartStrategyFactory,
@@ -70,6 +73,7 @@ class MesosJobManager(
     ioExecutor,
     instanceManager,
     scheduler,
+    blobServer,
     libraryCacheManager,
     archive,
     restartStrategyFactory,

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
index 29f7706..c50a888 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.blob;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
@@ -25,7 +26,6 @@ import org.apache.flink.util.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import java.io.File;
 import java.io.FileOutputStream;
@@ -33,6 +33,11 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -47,7 +52,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * download it from a distributed file system (if available) or the BLOB
  * server.</p>
  */
-public final class BlobCache implements BlobService {
+public class BlobCache extends TimerTask implements BlobService {
 
 	/** The log object used for debugging. */
 	private static final Logger LOG = LoggerFactory.getLogger(BlobCache.class);
@@ -71,6 +76,32 @@ public final class BlobCache implements BlobService {
 	/** Configuration for the blob client like ssl parameters required to connect to the blob server */
 	private final Configuration blobClientConfig;
 
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Job reference counters with a time-to-live (TTL).
+	 */
+	private static class RefCount {
+		/**
+		 * Number of references to a job.
+		 */
+		public int references = 0;
+		
+		/**
+		 * Timestamp in milliseconds when any job data should be cleaned up (no cleanup for
+		 * non-positive values).
+		 */
+		public long keepUntil = -1;
+	}
+
+	/** Map to store the number of references to a specific job */
+	private final Map<JobID, RefCount> jobRefCounters = new HashMap<>();
+
+	/** Time interval (ms) to run the cleanup task; also used as the default TTL. */
+	private final long cleanupInterval;
+
+	private final Timer cleanupTimer;
+
 	/**
 	 * Instantiates a new BLOB cache.
 	 *
@@ -108,11 +139,63 @@ public final class BlobCache implements BlobService {
 			this.numFetchRetries = 0;
 		}
 
+		// Initializing the clean up task
+		this.cleanupTimer = new Timer(true);
+
+		cleanupInterval = blobClientConfig.getLong(BlobServerOptions.CLEANUP_INTERVAL) * 1000;
+		this.cleanupTimer.schedule(this, cleanupInterval, cleanupInterval);
+
 		// Add shutdown hook to delete storage directory
 		shutdownHook = BlobUtils.addShutdownHook(this, LOG);
 	}
 
 	/**
+	 * Registers use of job-related BLOBs.
+	 * <p>
+	 * Using any other method to access BLOBs, e.g. {@link #getFile}, is only valid within calls
+	 * to {@link #registerJob(JobID)} and {@link #releaseJob(JobID)}.
+	 *
+	 * @param jobId
+	 * 		ID of the job this blob belongs to
+	 *
+	 * @see #releaseJob(JobID)
+	 */
+	public void registerJob(JobID jobId) {
+		synchronized (jobRefCounters) {
+			RefCount ref = jobRefCounters.get(jobId);
+			if (ref == null) {
+				ref = new RefCount();
+				jobRefCounters.put(jobId, ref);
+			}
+			++ref.references;
+		}
+	}
+
+	/**
+	 * Unregisters use of job-related BLOBs and allow them to be released.
+	 *
+	 * @param jobId
+	 * 		ID of the job this blob belongs to
+	 *
+	 * @see #registerJob(JobID)
+	 */
+	public void releaseJob(JobID jobId) {
+		synchronized (jobRefCounters) {
+			RefCount ref = jobRefCounters.get(jobId);
+
+			if (ref == null) {
+				LOG.warn("improper use of releaseJob() without a matching number of registerJob() calls");
+				return;
+			}
+
+			--ref.references;
+			if (ref.references == 0) {
+				ref.keepUntil = System.currentTimeMillis() + cleanupInterval;
+			}
+		}
+	}
+
+	/**
 	 * Returns local copy of the (job-unrelated) file for the BLOB with the given key.
 	 * <p>
 	 * The method will first attempt to serve the BLOB from its local cache. If the BLOB is not in
@@ -148,7 +231,7 @@ public final class BlobCache implements BlobService {
 	 * 		Thrown if an I/O error occurs while downloading the BLOBs from the BLOB server.
 	 */
 	@Override
-	public File getFile(@Nonnull JobID jobId, BlobKey key) throws IOException {
+	public File getFile(JobID jobId, BlobKey key) throws IOException {
 		checkNotNull(jobId);
 		return getFileInternal(jobId, key);
 	}
@@ -258,7 +341,7 @@ public final class BlobCache implements BlobService {
 	 * @throws IOException
 	 */
 	@Override
-	public void delete(@Nonnull JobID jobId, BlobKey key) throws IOException {
+	public void delete(JobID jobId, BlobKey key) throws IOException {
 		checkNotNull(jobId);
 		deleteInternal(jobId, key);
 	}
@@ -307,7 +390,7 @@ public final class BlobCache implements BlobService {
 	 * 		thrown if an I/O error occurs while transferring the request to the BLOB server or if the
 	 * 		BLOB server cannot delete the file
 	 */
-	public void deleteGlobal(@Nonnull JobID jobId, BlobKey key) throws IOException {
+	public void deleteGlobal(JobID jobId, BlobKey key) throws IOException {
 		checkNotNull(jobId);
 		deleteGlobalInternal(jobId, key);
 	}
@@ -341,8 +424,40 @@ public final class BlobCache implements BlobService {
 		return serverAddress.getPort();
 	}
 
+	/**
+	 * Cleans up BLOBs which are not referenced anymore.
+	 */
+	@Override
+	public void run() {
+		synchronized (jobRefCounters) {
+			Iterator<Map.Entry<JobID, RefCount>> entryIter = jobRefCounters.entrySet().iterator();
+			final long currentTimeMillis = System.currentTimeMillis();
+
+			while (entryIter.hasNext()) {
+				Map.Entry<JobID, RefCount> entry = entryIter.next();
+				RefCount ref = entry.getValue();
+
+				if (ref.references <= 0 && ref.keepUntil > 0 && currentTimeMillis >= ref.keepUntil) {
+					JobID jobId = entry.getKey();
+
+					final File localFile =
+						new File(BlobUtils.getStorageLocationPath(storageDir.getAbsolutePath(), jobId));
+					try {
+						FileUtils.deleteDirectory(localFile);
+						// let's only remove this directory from cleanup if the cleanup was successful
+						entryIter.remove();
+					} catch (Throwable t) {
+						LOG.warn("Failed to locally delete job directory " + localFile.getAbsolutePath(), t);
+					}
+				}
+			}
+		}
+	}
+
 	@Override
 	public void close() throws IOException {
+		cleanupTimer.cancel();
+
 		if (shutdownRequested.compareAndSet(false, true)) {
 			LOG.info("Shutting down BlobCache");
 
@@ -369,8 +484,19 @@ public final class BlobCache implements BlobService {
 		return new BlobClient(serverAddress, blobClientConfig);
 	}
 
-	public File getStorageDir() {
-		return this.storageDir;
+	/**
+	 * Returns a file handle to the file associated with the given blob key on the blob
+	 * server.
+	 *
+	 * <p><strong>This is only called from the {@link BlobServerConnection}</strong>
+	 *
+	 * @param jobId ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated)
+	 * @param key identifying the file
+	 * @return file handle to the file
+	 */
+	@VisibleForTesting
+	public File getStorageLocation(JobID jobId, BlobKey key) {
+		return BlobUtils.getStorageLocation(storageDir, jobId, key);
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
index 9a2f59e..8f1487a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
@@ -30,7 +30,6 @@ import org.apache.flink.util.InstantiationUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLParameters;
@@ -166,7 +165,7 @@ public final class BlobClient implements Closeable {
 	 * @throws IOException
 	 * 		if an I/O error occurs during the download
 	 */
-	public InputStream get(@Nonnull JobID jobId, BlobKey blobKey) throws IOException {
+	public InputStream get(JobID jobId, BlobKey blobKey) throws IOException {
 		checkNotNull(jobId);
 		return getInternal(jobId, blobKey);
 	}
@@ -339,7 +338,7 @@ public final class BlobClient implements Closeable {
 	 * 		thrown if an I/O error occurs while reading the data from the input stream or uploading the
 	 * 		data to the BLOB server
 	 */
-	public BlobKey put(@Nonnull JobID jobId, InputStream inputStream) throws IOException {
+	public BlobKey put(JobID jobId, InputStream inputStream) throws IOException {
 		checkNotNull(jobId);
 		return putInputStream(jobId, inputStream);
 	}
@@ -369,7 +368,7 @@ public final class BlobClient implements Closeable {
 		checkNotNull(value);
 
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("PUT BLOB buffer ({} bytes) to {}.", len, socket.getLocalSocketAddress());
+			LOG.debug("PUT BLOB buffer (" + len + " bytes) to " + socket.getLocalSocketAddress() + ".");
 		}
 
 		try {
@@ -556,7 +555,7 @@ public final class BlobClient implements Closeable {
 	 * 		thrown if an I/O error occurs while transferring the request to the BLOB server or if the
 	 * 		BLOB server cannot delete the file
 	 */
-	public void delete(@Nonnull JobID jobId, BlobKey key) throws IOException {
+	public void delete(JobID jobId, BlobKey key) throws IOException {
 		checkNotNull(jobId);
 		deleteInternal(jobId, key);
 	}
@@ -603,23 +602,21 @@ public final class BlobClient implements Closeable {
 
 	/**
 	 * Uploads the JAR files to a {@link BlobServer} at the given address.
-	 * <p>
-	 * TODO: add jobId to signature after adapting the BlobLibraryCacheManager
 	 *
 	 * @param serverAddress
 	 * 		Server address of the {@link BlobServer}
 	 * @param clientConfig
 	 * 		Any additional configuration for the blob client
+	 * @param jobId
+	 * 		ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated)
 	 * @param jars
 	 * 		List of JAR files to upload
 	 *
 	 * @throws IOException
 	 * 		if the upload fails
 	 */
-	public static List<BlobKey> uploadJarFiles(
-			InetSocketAddress serverAddress,
-			Configuration clientConfig,
-			List<Path> jars) throws IOException {
+	public static List<BlobKey> uploadJarFiles(InetSocketAddress serverAddress,
+			Configuration clientConfig, JobID jobId, List<Path> jars) throws IOException {checkNotNull(jobId);
 		if (jars.isEmpty()) {
 			return Collections.emptyList();
 		} else {
@@ -631,7 +628,7 @@ public final class BlobClient implements Closeable {
 					FSDataInputStream is = null;
 					try {
 						is = fs.open(jar);
-						final BlobKey key = blobClient.putInputStream(null, is);
+						final BlobKey key = blobClient.putInputStream(jobId, is);
 						blobKeys.add(key);
 					} finally {
 						if (is != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index 43a060a..bfcf881 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.blob;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
@@ -29,7 +30,6 @@ import org.apache.flink.util.NetUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import javax.net.ssl.SSLContext;
 import java.io.File;
@@ -196,7 +196,8 @@ public class BlobServer extends Thread implements BlobService {
 	 * @param key identifying the file
 	 * @return file handle to the file
 	 */
-	File getStorageLocation(JobID jobId, BlobKey key) {
+	@VisibleForTesting
+	public File getStorageLocation(JobID jobId, BlobKey key) {
 		return BlobUtils.getStorageLocation(storageDir, jobId, key);
 	}
 
@@ -374,7 +375,7 @@ public class BlobServer extends Thread implements BlobService {
 	 * 		Thrown if the file retrieval failed.
 	 */
 	@Override
-	public File getFile(@Nonnull JobID jobId, BlobKey key) throws IOException {
+	public File getFile(JobID jobId, BlobKey key) throws IOException {
 		checkNotNull(jobId);
 		return getFileInternal(jobId, key);
 	}
@@ -450,7 +451,7 @@ public class BlobServer extends Thread implements BlobService {
 	 * @throws IOException
 	 */
 	@Override
-	public void delete(@Nonnull JobID jobId, BlobKey key) throws IOException {
+	public void delete(JobID jobId, BlobKey key) throws IOException {
 		checkNotNull(jobId);
 		deleteInternal(jobId, key);
 	}
@@ -483,6 +484,37 @@ public class BlobServer extends Thread implements BlobService {
 	}
 
 	/**
+	 * Removes all BLOBs from local and HA store belonging to the given job ID.
+	 *
+	 * @param jobId
+	 * 		ID of the job this blob belongs to
+	 */
+	public void cleanupJob(JobID jobId) {
+		checkNotNull(jobId);
+
+		final File jobDir =
+			new File(BlobUtils.getStorageLocationPath(storageDir.getAbsolutePath(), jobId));
+
+		readWriteLock.writeLock().lock();
+
+		try {
+			// delete locally
+			try {
+				FileUtils.deleteDirectory(jobDir);
+			} catch (IOException e) {
+				LOG.warn("Failed to locally delete BLOB storage directory at " +
+					jobDir.getAbsolutePath(), e);
+			}
+
+			// delete in HA store
+			blobStore.deleteAll(jobId);
+		} finally {
+			readWriteLock.writeLock().unlock();
+		}
+	}
+
+
+	/**
 	 * Returns the port on which the server is listening.
 	 *
 	 * @return port on which the server is listening

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
index f1054c0..7f617f9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
@@ -139,14 +139,7 @@ class BlobServerConnection extends Thread {
 			LOG.error("Error while executing BLOB connection.", t);
 		}
 		finally {
-			try {
-				if (clientSocket != null) {
-					clientSocket.close();
-				}
-			} catch (Throwable t) {
-				LOG.debug("Exception while closing BLOB server connection socket.", t);
-			}
-
+			closeSilently(clientSocket, LOG);
 			blobServer.unregisterConnection(this);
 		}
 	}
@@ -433,9 +426,8 @@ class BlobServerConnection extends Thread {
 			final InputStream inputStream, final File incomingFile, final byte[] buf)
 			throws IOException {
 		MessageDigest md = BlobUtils.createMessageDigest();
-		FileOutputStream fos = new FileOutputStream(incomingFile);
 
-		try {
+		try (FileOutputStream fos = new FileOutputStream(incomingFile)) {
 			while (true) {
 				final int bytesExpected = readLength(inputStream);
 				if (bytesExpected == -1) {
@@ -453,12 +445,6 @@ class BlobServerConnection extends Thread {
 				md.update(buf, 0, bytesExpected);
 			}
 			return new BlobKey(md.digest());
-		} finally {
-			try {
-				fos.close();
-			} catch (Throwable t) {
-				LOG.warn("Cannot close stream to BLOB staging file", t);
-			}
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
index a78c88c..0db5a58 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.blob;
 
 import org.apache.flink.api.common.JobID;
 
-import javax.annotation.Nonnull;
 import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
@@ -50,7 +49,7 @@ public interface BlobService extends Closeable {
 	 * @throws java.io.FileNotFoundException when the path does not exist;
 	 * @throws IOException if any other error occurs when retrieving the file
 	 */
-	File getFile(@Nonnull JobID jobId, BlobKey key) throws IOException;
+	File getFile(JobID jobId, BlobKey key) throws IOException;
 
 	/**
 	 * Deletes the (job-unrelated) file associated with the provided blob key.
@@ -67,7 +66,7 @@ public interface BlobService extends Closeable {
 	 * @param key associated with the file to be deleted
 	 * @throws IOException
 	 */
-	void delete(@Nonnull JobID jobId, BlobKey key) throws IOException;
+	void delete(JobID jobId, BlobKey key) throws IOException;
 
 	/**
 	 * Returns the port of the blob service.

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
index 9b5724b..dabd1bf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
@@ -29,8 +29,8 @@ import org.apache.flink.util.StringUtils;
 
 import org.slf4j.Logger;
 
-import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
+import java.io.Closeable;
 import java.io.EOFException;
 import java.io.File;
 import java.io.IOException;
@@ -175,7 +175,7 @@ public class BlobUtils {
 	static File getIncomingDirectory(File storageDir) {
 		final File incomingDir = new File(storageDir, "incoming");
 
-		mkdirTolerateExisting(incomingDir, "incoming");
+		mkdirTolerateExisting(incomingDir);
 
 		return incomingDir;
 	}
@@ -185,15 +185,13 @@ public class BlobUtils {
 	 *
 	 * @param dir
 	 * 		directory to create
-	 * @param dirType
-	 * 		the type of the directory (included in error message if something fails)
 	 */
-	private static void mkdirTolerateExisting(final File dir, final String dirType) {
+	private static void mkdirTolerateExisting(final File dir) {
 		// note: thread-safe create should try to mkdir first and then ignore the case that the
 		//       directory already existed
 		if (!dir.mkdirs() && !dir.exists()) {
 			throw new RuntimeException(
-				"Cannot create " + dirType + " directory '" + dir.getAbsolutePath() + "'.");
+				"Cannot create directory '" + dir.getAbsolutePath() + "'.");
 		}
 	}
 
@@ -210,10 +208,10 @@ public class BlobUtils {
 	 * @return the (designated) physical storage location of the BLOB
 	 */
 	static File getStorageLocation(
-			@Nonnull File storageDir, @Nullable JobID jobId, @Nonnull BlobKey key) {
+			File storageDir, @Nullable JobID jobId, BlobKey key) {
 		File file = new File(getStorageLocationPath(storageDir.getAbsolutePath(), jobId, key));
 
-		mkdirTolerateExisting(file.getParentFile(), "cache");
+		mkdirTolerateExisting(file.getParentFile());
 
 		return file;
 	}
@@ -229,7 +227,7 @@ public class BlobUtils {
 	 *
 	 * @return the storage directory for BLOBs belonging to the job with the given ID
 	 */
-	static String getStorageLocationPath(@Nonnull String storageDir, @Nullable JobID jobId) {
+	static String getStorageLocationPath(String storageDir, @Nullable JobID jobId) {
 		if (jobId == null) {
 			// format: $base/no_job
 			return String.format("%s/%s", storageDir, NO_JOB_DIR_PREFIX);
@@ -256,7 +254,7 @@ public class BlobUtils {
 	 * @return the path to the given BLOB
 	 */
 	static String getStorageLocationPath(
-			@Nonnull String storageDir, @Nullable JobID jobId, @Nonnull BlobKey key) {
+			String storageDir, @Nullable JobID jobId, BlobKey key) {
 		if (jobId == null) {
 			// format: $base/no_job/blob_$key
 			return String.format("%s/%s/%s%s",
@@ -273,7 +271,6 @@ public class BlobUtils {
 	 *
 	 * @return a new instance of the message digest to use for the BLOB key computation
 	 */
-	@Nonnull
 	static MessageDigest createMessageDigest() {
 		try {
 			return MessageDigest.getInstance(HASHING_ALGORITHM);
@@ -285,7 +282,7 @@ public class BlobUtils {
 	/**
 	 * Adds a shutdown hook to the JVM and returns the Thread, which has been registered.
 	 */
-	static Thread addShutdownHook(final BlobService service, final Logger logger) {
+	static Thread addShutdownHook(final Closeable service, final Logger logger) {
 		checkNotNull(service);
 		checkNotNull(logger);
 
@@ -399,9 +396,7 @@ public class BlobUtils {
 			try {
 				socket.close();
 			} catch (Throwable t) {
-				if (LOG.isDebugEnabled()) {
-					LOG.debug("Error while closing resource after BLOB transfer.", t);
-				}
+				LOG.debug("Exception while closing BLOB server connection socket.", t);
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
index 9cc6210..425461c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
@@ -234,8 +234,7 @@ public class JobClient {
 			int pos = 0;
 			for (BlobKey blobKey : props.requiredJarFiles()) {
 				try {
-					// TODO: make use of job-related BLOBs after adapting the BlobLibraryCacheManager
-					allURLs[pos++] = blobClient.getFile(blobKey).toURI().toURL();
+					allURLs[pos++] = blobClient.getFile(jobID, blobKey).toURI().toURL();
 				} catch (Exception e) {
 					try {
 						blobClient.close();

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 9fc1fc4..bb0b3e4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobServer;
-import org.apache.flink.runtime.blob.BlobService;
 import org.apache.flink.runtime.client.JobSubmissionException;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
@@ -231,7 +230,7 @@ public abstract class Dispatcher extends RpcEndpoint implements DispatcherGatewa
 		Configuration configuration,
 		RpcService rpcService,
 		HighAvailabilityServices highAvailabilityServices,
-		BlobService blobService,
+		BlobServer blobServer,
 		HeartbeatServices heartbeatServices,
 		MetricRegistry metricRegistry,
 		OnCompletionActions onCompleteActions,

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
index 54d698e..dfd6a8a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.dispatcher;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobServer;
-import org.apache.flink.runtime.blob.BlobService;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -65,7 +64,7 @@ public class StandaloneDispatcher extends Dispatcher {
 			Configuration configuration,
 			RpcService rpcService,
 			HighAvailabilityServices highAvailabilityServices,
-			BlobService blobService,
+			BlobServer blobServer,
 			HeartbeatServices heartbeatServices,
 			MetricRegistry metricRegistry,
 			OnCompletionActions onCompleteActions,
@@ -77,7 +76,7 @@ public class StandaloneDispatcher extends Dispatcher {
 			configuration,
 			rpcService,
 			highAvailabilityServices,
-			blobService,
+			blobServer,
 			heartbeatServices,
 			metricRegistry,
 			onCompleteActions,

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
index 8728186..a7c6120 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobServer;
-import org.apache.flink.runtime.blob.BlobService;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -90,7 +89,7 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
 			ResourceID resourceId,
 			RpcService rpcService,
 			HighAvailabilityServices highAvailabilityServices,
-			BlobService blobService,
+			BlobServer blobService,
 			HeartbeatServices heartbeatServices,
 			MetricRegistry metricRegistry,
 			FatalErrorHandler fatalErrorHandler) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
index 9aff6f9..c8fc4e4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
@@ -23,9 +23,12 @@ import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.blob.BlobService;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.util.ExceptionUtils;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.net.URL;
 import java.util.Arrays;
@@ -33,72 +36,52 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * For each job graph that is submitted to the system the library cache manager maintains
- * a set of libraries (typically JAR files) which the job requires to run. The library cache manager
- * caches library files in order to avoid unnecessary retransmission of data. It is based on a singleton
- * programming pattern, so there exists at most one library manager at a time.
- * <p>
- * All files registered via {@link #registerJob(JobID, Collection, Collection)} are reference-counted
- * and are removed by a timer-based cleanup task if their reference counter is zero.
+ * Provides facilities to download a set of libraries (typically JAR files) for a job from a
+ * {@link BlobService} and create a class loader with references to them.
  */
-public final class BlobLibraryCacheManager extends TimerTask implements LibraryCacheManager {
+public class BlobLibraryCacheManager implements LibraryCacheManager {
+
+	private static final Logger LOG = LoggerFactory.getLogger(BlobLibraryCacheManager.class);
+
+	private static final ExecutionAttemptID JOB_ATTEMPT_ID = new ExecutionAttemptID(-1, -1);
 
-	private static Logger LOG = LoggerFactory.getLogger(BlobLibraryCacheManager.class);
-	
-	private static ExecutionAttemptID JOB_ATTEMPT_ID = new ExecutionAttemptID(-1, -1);
-	
 	// --------------------------------------------------------------------------------------------
-	
+
 	/** The global lock to synchronize operations */
 	private final Object lockObject = new Object();
 
 	/** Registered entries per job */
-	private final Map<JobID, LibraryCacheEntry> cacheEntries = new HashMap<JobID, LibraryCacheEntry>();
-	
-	/** Map to store the number of reference to a specific file */
-	private final Map<BlobKey, Integer> blobKeyReferenceCounters = new HashMap<BlobKey, Integer>();
+	private final Map<JobID, LibraryCacheEntry> cacheEntries = new HashMap<>();
 
 	/** The blob service to download libraries */
 	private final BlobService blobService;
-	
-	private final Timer cleanupTimer;
-	
+
 	// --------------------------------------------------------------------------------------------
 
-	/**
-	 * Creates the blob library cache manager.
-	 *
-	 * @param blobService blob file retrieval service to use
-	 * @param cleanupInterval cleanup interval in milliseconds
-	 */
-	public BlobLibraryCacheManager(BlobService blobService, long cleanupInterval) {
+	public BlobLibraryCacheManager(BlobService blobService) {
 		this.blobService = checkNotNull(blobService);
-
-		// Initializing the clean up task
-		this.cleanupTimer = new Timer(true);
-		this.cleanupTimer.schedule(this, cleanupInterval, cleanupInterval);
 	}
 
-	// --------------------------------------------------------------------------------------------
-	
 	@Override
 	public void registerJob(JobID id, Collection<BlobKey> requiredJarFiles, Collection<URL> requiredClasspaths)
-			throws IOException {
+		throws IOException {
 		registerTask(id, JOB_ATTEMPT_ID, requiredJarFiles, requiredClasspaths);
 	}
-	
+
 	@Override
-	public void registerTask(JobID jobId, ExecutionAttemptID task, Collection<BlobKey> requiredJarFiles,
-			Collection<URL> requiredClasspaths) throws IOException {
+	public void registerTask(
+		JobID jobId,
+		ExecutionAttemptID task,
+		@Nullable Collection<BlobKey> requiredJarFiles,
+		@Nullable Collection<URL> requiredClasspaths) throws IOException {
+
 		checkNotNull(jobId, "The JobId must not be null.");
 		checkNotNull(task, "The task execution id must not be null.");
 
@@ -113,43 +96,31 @@ public final class BlobLibraryCacheManager extends TimerTask implements LibraryC
 			LibraryCacheEntry entry = cacheEntries.get(jobId);
 
 			if (entry == null) {
-				// create a new entry in the library cache
-				BlobKey[] keys = requiredJarFiles.toArray(new BlobKey[requiredJarFiles.size()]);
-				URL[] urls = new URL[keys.length + requiredClasspaths.size()];
-
+				URL[] urls = new URL[requiredJarFiles.size() + requiredClasspaths.size()];
 				int count = 0;
 				try {
-					for (; count < keys.length; count++) {
-						BlobKey blobKey = keys[count];
-						urls[count] = registerReferenceToBlobKeyAndGetURL(blobKey);
-					}
-				}
-				catch (Throwable t) {
-					// undo the reference count increases
-					try {
-						for (int i = 0; i < count; i++) {
-							unregisterReferenceToBlobKey(keys[i]);
-						}
+					// add URLs to locally cached JAR files
+					for (BlobKey key : requiredJarFiles) {
+						urls[count] = blobService.getFile(jobId, key).toURI().toURL();
+						++count;
 					}
-					catch (Throwable tt) {
-						LOG.error("Error while updating library reference counters.", tt);
+
+					// add classpaths
+					for (URL url : requiredClasspaths) {
+						urls[count] = url;
+						++count;
 					}
 
+					cacheEntries.put(jobId, new LibraryCacheEntry(
+						requiredJarFiles, requiredClasspaths, urls, task));
+				} catch (Throwable t) {
 					// rethrow or wrap
 					ExceptionUtils.tryRethrowIOException(t);
-					throw new IOException("Library cache could not register the user code libraries.", t);
+					throw new IOException(
+						"Library cache could not register the user code libraries.", t);
 				}
-
-				// add classpaths
-				for (URL url : requiredClasspaths) {
-					urls[count] = url;
-					count++;
-				}
-
-				cacheEntries.put(jobId, new LibraryCacheEntry(requiredJarFiles, urls, task));
-			}
-			else {
-				entry.register(task, requiredJarFiles);
+			} else {
+				entry.register(task, requiredJarFiles, requiredClasspaths);
 			}
 		}
 	}
@@ -158,7 +129,7 @@ public final class BlobLibraryCacheManager extends TimerTask implements LibraryC
 	public void unregisterJob(JobID id) {
 		unregisterTask(id, JOB_ATTEMPT_ID);
 	}
-	
+
 	@Override
 	public void unregisterTask(JobID jobId, ExecutionAttemptID task) {
 		checkNotNull(jobId, "The JobId must not be null.");
@@ -172,162 +143,167 @@ public final class BlobLibraryCacheManager extends TimerTask implements LibraryC
 					cacheEntries.remove(jobId);
 
 					entry.releaseClassLoader();
-
-					for (BlobKey key : entry.getLibraries()) {
-						unregisterReferenceToBlobKey(key);
-					}
 				}
 			}
 			// else has already been unregistered
 		}
 	}
-
+	
 	@Override
-	public ClassLoader getClassLoader(JobID id) {
-		if (id == null) {
-			throw new IllegalArgumentException("The JobId must not be null.");
-		}
-		
+	public ClassLoader getClassLoader(JobID jobId) {
+		checkNotNull(jobId, "The JobId must not be null.");
+
 		synchronized (lockObject) {
-			LibraryCacheEntry entry = cacheEntries.get(id);
-			if (entry != null) {
-				return entry.getClassLoader();
-			} else {
-				throw new IllegalStateException("No libraries are registered for job " + id);
+			LibraryCacheEntry entry = cacheEntries.get(jobId);
+			if (entry == null) {
+				throw new IllegalStateException("No libraries are registered for job " + jobId);
 			}
+			return entry.getClassLoader();
 		}
 	}
 
-	public int getBlobServerPort() {
-		return blobService.getPort();
-	}
-
-	@Override
-	public void shutdown() throws IOException{
-		try {
-			run();
-		} catch (Throwable t) {
-			LOG.warn("Failed to run clean up task before shutdown", t);
-		}
-
-		blobService.close();
-		cleanupTimer.cancel();
-	}
-	
 	/**
-	 * Cleans up blobs which are not referenced anymore
+	 * Gets the number of tasks holding {@link ClassLoader} references for the given job.
+	 *
+	 * @param jobId ID of a job
+	 *
+	 * @return number of reference holders
 	 */
-	@Override
-	public void run() {
-		synchronized (lockObject) {
-			Iterator<Map.Entry<BlobKey, Integer>> entryIter = blobKeyReferenceCounters.entrySet().iterator();
-			
-			while (entryIter.hasNext()) {
-				Map.Entry<BlobKey, Integer> entry = entryIter.next();
-				BlobKey key = entry.getKey();
-				int references = entry.getValue();
-				
-				try {
-					if (references <= 0) {
-						blobService.delete(key);
-						entryIter.remove();
-					}
-				} catch (Throwable t) {
-					LOG.warn("Could not delete file with blob key" + key, t);
-				}
-			}
-		}
-	}
-	
-	public int getNumberOfReferenceHolders(JobID jobId) {
+	int getNumberOfReferenceHolders(JobID jobId) {
 		synchronized (lockObject) {
 			LibraryCacheEntry entry = cacheEntries.get(jobId);
 			return entry == null ? 0 : entry.getNumberOfReferenceHolders();
 		}
 	}
-	
-	int getNumberOfCachedLibraries() {
-		return blobKeyReferenceCounters.size();
-	}
-	
-	private URL registerReferenceToBlobKeyAndGetURL(BlobKey key) throws IOException {
-		// it is important that we fetch the URL before increasing the counter.
-		// in case the URL cannot be created (failed to fetch the BLOB), we have no stale counter
-		try {
-			URL url = blobService.getFile(key).toURI().toURL();
 
-			Integer references = blobKeyReferenceCounters.get(key);
-			int newReferences = references == null ? 1 : references + 1;
-			blobKeyReferenceCounters.put(key, newReferences);
-
-			return url;
-		}
-		catch (IOException e) {
-			throw new IOException("Cannot get library with hash " + key, e);
-		}
+	/**
+	 * Returns the number of registered jobs that this library cache manager handles.
+	 *
+	 * @return number of jobs (irrespective of the actual number of tasks per job)
+	 */
+	int getNumberOfManagedJobs() {
+		// no synchronisation necessary
+		return cacheEntries.size();
 	}
-	
-	private void unregisterReferenceToBlobKey(BlobKey key) {
-		Integer references = blobKeyReferenceCounters.get(key);
-		if (references != null) {
-			int newReferences = Math.max(references - 1, 0);
-			blobKeyReferenceCounters.put(key, newReferences);
-		}
-		else {
-			// make sure we have an entry in any case, that the cleanup timer removes any
-			// present libraries
-			blobKeyReferenceCounters.put(key, 0);
+
+	@Override
+	public void shutdown() {
+		synchronized (lockObject) {
+			for (LibraryCacheEntry entry : cacheEntries.values()) {
+				entry.releaseClassLoader();
+			}
 		}
 	}
 
-
 	// --------------------------------------------------------------------------------------------
 
 	/**
 	 * An entry in the per-job library cache. Tracks which execution attempts
 	 * still reference the libraries. Once none reference it any more, the
-	 * libraries can be cleaned up.
+	 * class loaders can be cleaned up.
 	 */
 	private static class LibraryCacheEntry {
-		
+
 		private final FlinkUserCodeClassLoader classLoader;
-		
+
 		private final Set<ExecutionAttemptID> referenceHolders;
-		
+		/**
+		 * Set of BLOB keys used for a previous job/task registration.
+		 *
+		 * <p>The purpose of this is to make sure, future registrations do not differ in content as
+		 * this is a contract of the {@link BlobLibraryCacheManager}.
+		 */
 		private final Set<BlobKey> libraries;
-		
-		
-		public LibraryCacheEntry(Collection<BlobKey> libraries, URL[] libraryURLs, ExecutionAttemptID initialReference) {
+
+		/**
+		 * Set of class path URLs used for a previous job/task registration.
+		 *
+		 * <p>The purpose of this is to make sure, future registrations do not differ in content as
+		 * this is a contract of the {@link BlobLibraryCacheManager}.
+		 */
+		private final Set<String> classPaths;
+
+		/**
+		 * Creates a cache entry for a flink class loader with the given <tt>libraryURLs</tt>.
+		 *
+		 * @param requiredLibraries
+		 * 		BLOB keys required by the class loader (stored for ensuring consistency among different
+		 * 		job/task registrations)
+		 * @param requiredClasspaths
+		 * 		class paths required by the class loader (stored for ensuring consistency among
+		 * 		different job/task registrations)
+		 * @param libraryURLs
+		 * 		complete list of URLs to use for the class loader (includes references to the
+		 * 		<tt>requiredLibraries</tt> and <tt>requiredClasspaths</tt>)
+		 * @param initialReference
+		 * 		reference holder ID
+		 */
+		LibraryCacheEntry(
+				Collection<BlobKey> requiredLibraries,
+				Collection<URL> requiredClasspaths,
+				URL[] libraryURLs,
+				ExecutionAttemptID initialReference) {
+
 			this.classLoader = new FlinkUserCodeClassLoader(libraryURLs);
-			this.libraries = new HashSet<>(libraries);
+			// NOTE: do not store the class paths, i.e. URLs, into a set for performance reasons
+			//       see http://findbugs.sourceforge.net/bugDescriptions.html#DMI_COLLECTION_OF_URLS
+			//       -> alternatively, compare their string representation
+			this.classPaths = new HashSet<>(requiredClasspaths.size());
+			for (URL url : requiredClasspaths) {
+				classPaths.add(url.toString());
+			}
+			this.libraries = new HashSet<>(requiredLibraries);
 			this.referenceHolders = new HashSet<>();
 			this.referenceHolders.add(initialReference);
 		}
-		
-		
+
 		public ClassLoader getClassLoader() {
 			return classLoader;
 		}
-		
+
 		public Set<BlobKey> getLibraries() {
 			return libraries;
 		}
-		
-		public void register(ExecutionAttemptID task, Collection<BlobKey> keys) {
-			if (!libraries.containsAll(keys)) {
+
+		public void register(
+				ExecutionAttemptID task, Collection<BlobKey> requiredLibraries,
+				Collection<URL> requiredClasspaths) {
+
+			// Make sure the previous registration referred to the same libraries and class paths.
+			// NOTE: the original collections may contain duplicates and may not already be Set
+			//       collections with fast checks whether an item is contained in it.
+
+			// lazy construction of a new set for faster comparisons
+			if (libraries.size() != requiredLibraries.size() ||
+				!new HashSet<>(requiredLibraries).containsAll(libraries)) {
+
 				throw new IllegalStateException(
-						"The library registration references a different set of libraries than previous registrations for this job.");
+					"The library registration references a different set of library BLOBs than" +
+						" previous registrations for this job:\nold:" + libraries.toString() +
+						"\nnew:" + requiredLibraries.toString());
 			}
-			
+
+			// lazy construction of a new set with String representations of the URLs
+			if (classPaths.size() != requiredClasspaths.size() ||
+				!requiredClasspaths.stream().map(URL::toString).collect(Collectors.toSet())
+					.containsAll(classPaths)) {
+
+				throw new IllegalStateException(
+					"The library registration references a different set of library BLOBs than" +
+						" previous registrations for this job:\nold:" +
+						classPaths.toString() +
+						"\nnew:" + requiredClasspaths.toString());
+			}
+
 			this.referenceHolders.add(task);
 		}
-		
+
 		public boolean unregister(ExecutionAttemptID task) {
 			referenceHolders.remove(task);
 			return referenceHolders.isEmpty();
 		}
-		
-		public int getNumberOfReferenceHolders() {
+
+		int getNumberOfReferenceHolders() {
 			return referenceHolders.size();
 		}
 
@@ -343,5 +319,4 @@ public final class BlobLibraryCacheManager extends TimerTask implements LibraryC
 			}
 		}
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java
index 8e14e58..41eeb18 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java
@@ -28,7 +28,7 @@ import java.net.URL;
 import java.util.Collection;
 
 public class FallbackLibraryCacheManager implements LibraryCacheManager {
-	
+
 	private static Logger LOG = LoggerFactory.getLogger(FallbackLibraryCacheManager.class);
 
 	@Override
@@ -40,10 +40,10 @@ public class FallbackLibraryCacheManager implements LibraryCacheManager {
 	public void registerJob(JobID id, Collection<BlobKey> requiredJarFiles, Collection<URL> requiredClasspaths) {
 		LOG.warn("FallbackLibraryCacheManager cannot download files associated with blob keys.");
 	}
-	
+
 	@Override
 	public void registerTask(JobID id, ExecutionAttemptID execution, Collection<BlobKey> requiredJarFiles,
-			Collection<URL> requiredClasspaths) {
+		Collection<URL> requiredClasspaths) {
 		LOG.warn("FallbackLibraryCacheManager cannot download files associated with blob keys.");
 	}
 
@@ -51,7 +51,7 @@ public class FallbackLibraryCacheManager implements LibraryCacheManager {
 	public void unregisterJob(JobID id) {
 		LOG.warn("FallbackLibraryCacheManager does not book keeping of job IDs.");
 	}
-	
+
 	@Override
 	public void unregisterTask(JobID id, ExecutionAttemptID execution) {
 		LOG.warn("FallbackLibraryCacheManager does not book keeping of job IDs.");

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
index 5f9f443..93c6efd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
@@ -19,14 +19,15 @@
 package org.apache.flink.runtime.execution.librarycache;
 
 import org.apache.flink.runtime.blob.BlobKey;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 
 import java.io.IOException;
 import java.net.URL;
 import java.util.Collection;
 
 public interface LibraryCacheManager {
+
 	/**
 	 * Returns the user code class loader associated with id.
 	 *
@@ -36,30 +37,34 @@ public interface LibraryCacheManager {
 	ClassLoader getClassLoader(JobID id);
 
 	/**
-	 * Registers a job with its required jar files and classpaths. The jar files are identified by their blob keys.
+	 * Registers a job with its required jar files and classpaths. The jar files are identified by
+	 * their blob keys and downloaded for use by a {@link ClassLoader}.
 	 *
 	 * @param id job ID
 	 * @param requiredJarFiles collection of blob keys identifying the required jar files
 	 * @param requiredClasspaths collection of classpaths that are added to the user code class loader
+	 *
 	 * @throws IOException if any error occurs when retrieving the required jar files
 	 *
 	 * @see #unregisterJob(JobID) counterpart of this method
 	 */
 	void registerJob(JobID id, Collection<BlobKey> requiredJarFiles, Collection<URL> requiredClasspaths)
-			throws IOException;
-	
+		throws IOException;
+
 	/**
-	 * Registers a job task execution with its required jar files and classpaths. The jar files are identified by their blob keys.
+	 * Registers a job task execution with its required jar files and classpaths. The jar files are
+	 * identified by their blob keys and downloaded for use by a {@link ClassLoader}.
 	 *
 	 * @param id job ID
 	 * @param requiredJarFiles collection of blob keys identifying the required jar files
 	 * @param requiredClasspaths collection of classpaths that are added to the user code class loader
-	 * @throws IOException
+	 *
+	 * @throws IOException if any error occurs when retrieving the required jar files
 	 *
 	 * @see #unregisterTask(JobID, ExecutionAttemptID) counterpart of this method
 	 */
 	void registerTask(JobID id, ExecutionAttemptID execution, Collection<BlobKey> requiredJarFiles,
-			Collection<URL> requiredClasspaths) throws IOException;
+		Collection<URL> requiredClasspaths) throws IOException;
 
 	/**
 	 * Unregisters a job task execution from the library cache manager.
@@ -88,9 +93,7 @@ public interface LibraryCacheManager {
 	void unregisterJob(JobID id);
 
 	/**
-	 * Shutdown method
-	 *
-	 * @throws IOException
+	 * Shutdown method which may release created class loaders.
 	 */
-	void shutdown() throws IOException;
+	void shutdown();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index 6b92d79..c126875 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -536,7 +536,7 @@ public class JobGraph implements Serializable {
 			Configuration blobClientConfig) throws IOException {
 		if (!userJars.isEmpty()) {
 			// TODO: make use of job-related BLOBs after adapting the BlobLibraryCacheManager
-			List<BlobKey> blobKeys = BlobClient.uploadJarFiles(blobServerAddress, blobClientConfig, userJars);
+			List<BlobKey> blobKeys = BlobClient.uploadJarFiles(blobServerAddress, blobClientConfig, jobID, userJars);
 
 			for (BlobKey blobKey : blobKeys) {
 				if (!userJarBlobKeys.contains(blobKey)) {

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index 5838cf2..c312cd3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.jobmaster;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.blob.BlobService;
+import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
@@ -93,7 +93,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F
 			final Configuration configuration,
 			final RpcService rpcService,
 			final HighAvailabilityServices haServices,
-			final BlobService blobService,
+			final BlobServer blobService,
 			final HeartbeatServices heartbeatServices,
 			final OnCompletionActions toNotifyOnComplete,
 			final FatalErrorHandler errorHandler) throws Exception {
@@ -116,7 +116,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F
 			final Configuration configuration,
 			final RpcService rpcService,
 			final HighAvailabilityServices haServices,
-			final BlobService blobService,
+			final BlobServer blobService,
 			final HeartbeatServices heartbeatServices,
 			final MetricRegistry metricRegistry,
 			final OnCompletionActions toNotifyOnComplete,
@@ -199,6 +199,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F
 				haServices,
 				heartbeatServices,
 				jobManagerServices.executorService,
+				jobManagerServices.blobServer,
 				jobManagerServices.libraryCacheManager,
 				jobManagerServices.restartStrategyFactory,
 				jobManagerServices.rpcAskTimeout,

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
index e14f5af..57aeaff 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
@@ -19,11 +19,10 @@
 package org.apache.flink.runtime.jobmaster;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.blob.BlobService;
+import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
@@ -45,6 +44,7 @@ public class JobManagerServices {
 
 	public final ScheduledExecutorService executorService;
 
+	public final BlobServer blobServer;
 	public final BlobLibraryCacheManager libraryCacheManager;
 
 	public final RestartStrategyFactory restartStrategyFactory;
@@ -53,11 +53,13 @@ public class JobManagerServices {
 
 	public JobManagerServices(
 			ScheduledExecutorService executorService,
+			BlobServer blobServer,
 			BlobLibraryCacheManager libraryCacheManager,
 			RestartStrategyFactory restartStrategyFactory,
 			Time rpcAskTimeout) {
 
 		this.executorService = checkNotNull(executorService);
+		this.blobServer = checkNotNull(blobServer);
 		this.libraryCacheManager = checkNotNull(libraryCacheManager);
 		this.restartStrategyFactory = checkNotNull(restartStrategyFactory);
 		this.rpcAskTimeout = checkNotNull(rpcAskTimeout);
@@ -80,8 +82,9 @@ public class JobManagerServices {
 			firstException = t;
 		}
 
+		libraryCacheManager.shutdown();
 		try {
-			libraryCacheManager.shutdown();
+			blobServer.close();
 		}
 		catch (Throwable t) {
 			if (firstException == null) {
@@ -103,16 +106,12 @@ public class JobManagerServices {
 
 	public static JobManagerServices fromConfiguration(
 			Configuration config,
-			BlobService blobService) throws Exception {
+			BlobServer blobServer) throws Exception {
 
 		Preconditions.checkNotNull(config);
-		Preconditions.checkNotNull(blobService);
+		Preconditions.checkNotNull(blobServer);
 
-		final long cleanupInterval = config.getLong(
-			ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL,
-			ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000;
-
-		final BlobLibraryCacheManager libraryCacheManager = new BlobLibraryCacheManager(blobService, cleanupInterval);
+		final BlobLibraryCacheManager libraryCacheManager = new BlobLibraryCacheManager(blobServer);
 
 		final FiniteDuration timeout;
 		try {
@@ -127,6 +126,7 @@ public class JobManagerServices {
 
 		return new JobManagerServices(
 			futureExecutor,
+			blobServer,
 			libraryCacheManager,
 			RestartStrategyFactory.createRestartStrategyFactory(config),
 			Time.of(timeout.length(), timeout.unit()));

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index d6019db..a8a8632 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -28,6 +28,7 @@ import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
@@ -149,7 +150,10 @@ public class JobMaster extends RpcEndpoint implements JobMasterGateway {
 	/** Service to contend for and retrieve the leadership of JM and RM */
 	private final HighAvailabilityServices highAvailabilityServices;
 
-	/** Blob cache manager used across jobs */
+	/** Blob server used across jobs */
+	private final BlobServer blobServer;
+
+	/** Blob library cache manager used across jobs */
 	private final BlobLibraryCacheManager libraryCacheManager;
 
 	/** The metrics for the JobManager itself */
@@ -204,6 +208,7 @@ public class JobMaster extends RpcEndpoint implements JobMasterGateway {
 			HighAvailabilityServices highAvailabilityService,
 			HeartbeatServices heartbeatServices,
 			ScheduledExecutorService executor,
+			BlobServer blobServer,
 			BlobLibraryCacheManager libraryCacheManager,
 			RestartStrategyFactory restartStrategyFactory,
 			Time rpcAskTimeout,
@@ -221,6 +226,7 @@ public class JobMaster extends RpcEndpoint implements JobMasterGateway {
 		this.configuration = checkNotNull(configuration);
 		this.rpcTimeout = rpcAskTimeout;
 		this.highAvailabilityServices = checkNotNull(highAvailabilityService);
+		this.blobServer = checkNotNull(blobServer);
 		this.libraryCacheManager = checkNotNull(libraryCacheManager);
 		this.executor = checkNotNull(executor);
 		this.jobCompletionActions = checkNotNull(jobCompletionActions);
@@ -698,7 +704,7 @@ public class JobMaster extends RpcEndpoint implements JobMasterGateway {
 	@Override
 	public CompletableFuture<ClassloadingProps> requestClassloadingProps() {
 		return CompletableFuture.completedFuture(
-			new ClassloadingProps(libraryCacheManager.getBlobServerPort(),
+			new ClassloadingProps(blobServer.getPort(),
 				executionGraph.getRequiredJarFiles(),
 				executionGraph.getRequiredClasspaths()));
 	}
@@ -785,7 +791,7 @@ public class JobMaster extends RpcEndpoint implements JobMasterGateway {
 
 		if (registeredTaskManagers.containsKey(taskManagerId)) {
 			final RegistrationResponse response = new JMTMRegistrationSuccess(
-				resourceId, libraryCacheManager.getBlobServerPort());
+				resourceId, blobServer.getPort());
 			return CompletableFuture.completedFuture(response);
 		} else {
 			return getRpcService()
@@ -819,7 +825,7 @@ public class JobMaster extends RpcEndpoint implements JobMasterGateway {
 							}
 						});
 
-						return new JMTMRegistrationSuccess(resourceId, libraryCacheManager.getBlobServerPort());
+						return new JMTMRegistrationSuccess(resourceId, blobServer.getPort());
 					},
 					getMainThreadExecutor());
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java
index 98c7bf1..363c107 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.taskexecutor;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
 import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
@@ -53,6 +54,9 @@ public class JobManagerConnection {
 	// Checkpoint responder for the specific job manager
 	private final CheckpointResponder checkpointResponder;
 
+	// BLOB cache connected to the BLOB server at the specific job manager
+	private final BlobCache blobCache;
+
 	// Library cache manager connected to the specific job manager
 	private final LibraryCacheManager libraryCacheManager;
 
@@ -63,21 +67,22 @@ public class JobManagerConnection {
 	private final PartitionProducerStateChecker partitionStateChecker;
 
 	public JobManagerConnection(
-		JobID jobID,
-		ResourceID resourceID,
-		JobMasterGateway jobMasterGateway,
-		UUID leaderId,
-		TaskManagerActions taskManagerActions,
-		CheckpointResponder checkpointResponder,
-		LibraryCacheManager libraryCacheManager,
-		ResultPartitionConsumableNotifier resultPartitionConsumableNotifier,
-		PartitionProducerStateChecker partitionStateChecker) {
+				JobID jobID,
+				ResourceID resourceID,
+				JobMasterGateway jobMasterGateway,
+				UUID leaderId,
+				TaskManagerActions taskManagerActions,
+				CheckpointResponder checkpointResponder,
+				BlobCache blobCache, LibraryCacheManager libraryCacheManager,
+				ResultPartitionConsumableNotifier resultPartitionConsumableNotifier,
+				PartitionProducerStateChecker partitionStateChecker) {
 		this.jobID = Preconditions.checkNotNull(jobID);
 		this.resourceID = Preconditions.checkNotNull(resourceID);
 		this.leaderId = Preconditions.checkNotNull(leaderId);
 		this.jobMasterGateway = Preconditions.checkNotNull(jobMasterGateway);
 		this.taskManagerActions = Preconditions.checkNotNull(taskManagerActions);
 		this.checkpointResponder = Preconditions.checkNotNull(checkpointResponder);
+		this.blobCache = Preconditions.checkNotNull(blobCache);
 		this.libraryCacheManager = Preconditions.checkNotNull(libraryCacheManager);
 		this.resultPartitionConsumableNotifier = Preconditions.checkNotNull(resultPartitionConsumableNotifier);
 		this.partitionStateChecker = Preconditions.checkNotNull(partitionStateChecker);
@@ -111,6 +116,15 @@ public class JobManagerConnection {
 		return libraryCacheManager;
 	}
 
+	/**
+	 * Gets the BLOB cache connected to the respective BLOB server instance at the job manager.
+	 *
+	 * @return BLOB cache
+	 */
+	public BlobCache getBlobCache() {
+		return blobCache;
+	}
+
 	public ResultPartitionConsumableNotifier getResultPartitionConsumableNotifier() {
 		return resultPartitionConsumableNotifier;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 4abcdf4..a5ce84b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -352,6 +352,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 
 			TaskManagerActions taskManagerActions = jobManagerConnection.getTaskManagerActions();
 			CheckpointResponder checkpointResponder = jobManagerConnection.getCheckpointResponder();
+			BlobCache blobCache = jobManagerConnection.getBlobCache();
 			LibraryCacheManager libraryCache = jobManagerConnection.getLibraryCacheManager();
 			ResultPartitionConsumableNotifier resultPartitionConsumableNotifier = jobManagerConnection.getResultPartitionConsumableNotifier();
 			PartitionProducerStateChecker partitionStateChecker = jobManagerConnection.getPartitionStateChecker();
@@ -374,6 +375,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 				taskManagerActions,
 				inputSplitProvider,
 				checkpointResponder,
+				blobCache,
 				libraryCache,
 				fileCache,
 				taskManagerConfiguration,
@@ -935,14 +937,13 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 		InetSocketAddress blobServerAddress = new InetSocketAddress(jobMasterGateway.getHostname(), blobPort);
 
 		final LibraryCacheManager libraryCacheManager;
+		final BlobCache blobCache;
 		try {
-			final BlobCache blobCache = new BlobCache(
+			blobCache = new BlobCache(
 				blobServerAddress,
 				taskManagerConfiguration.getConfiguration(),
 				haServices.createBlobStore());
-			libraryCacheManager = new BlobLibraryCacheManager(
-				blobCache,
-				taskManagerConfiguration.getCleanupInterval());
+			libraryCacheManager = new BlobLibraryCacheManager(blobCache);
 		} catch (IOException e) {
 			// Can't pass the IOException up - we need a RuntimeException anyway
 			// two levels up where this is run asynchronously. Also, we don't
@@ -967,6 +968,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 			jobManagerLeaderId,
 			taskManagerActions,
 			checkpointResponder,
+			blobCache,
 			libraryCacheManager,
 			resultPartitionConsumableNotifier,
 			partitionStateChecker);
@@ -977,6 +979,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 		JobMasterGateway jobManagerGateway = jobManagerConnection.getJobManagerGateway();
 		jobManagerGateway.disconnectTaskManager(getResourceID(), cause);
 		jobManagerConnection.getLibraryCacheManager().shutdown();
+		jobManagerConnection.getBlobCache().close();
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
index ea9f576..7c7693b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.taskexecutor;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
@@ -53,8 +54,6 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
 	private final Time maxRegistrationPause;
 	private final Time refusedRegistrationPause;
 
-	private final long cleanupInterval;
-
 	private final UnmodifiableConfiguration configuration;
 
 	private final boolean exitJvmOnOutOfMemory;
@@ -78,7 +77,6 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
 		this.initialRegistrationPause = Preconditions.checkNotNull(initialRegistrationPause);
 		this.maxRegistrationPause = Preconditions.checkNotNull(maxRegistrationPause);
 		this.refusedRegistrationPause = Preconditions.checkNotNull(refusedRegistrationPause);
-		this.cleanupInterval = Preconditions.checkNotNull(cleanupInterval);
 		this.configuration = new UnmodifiableConfiguration(Preconditions.checkNotNull(configuration));
 		this.exitJvmOnOutOfMemory = exitJvmOnOutOfMemory;
 	}
@@ -107,10 +105,6 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
 		return refusedRegistrationPause;
 	}
 
-	public long getCleanupInterval() {
-		return cleanupInterval;
-	}
-
 	@Override
 	public Configuration getConfiguration() {
 		return configuration;
@@ -153,9 +147,7 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
 
 		LOG.info("Messages have a max timeout of " + timeout);
 
-		final long cleanupInterval = configuration.getLong(
-			ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL,
-			ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000;
+		final long cleanupInterval = configuration.getLong(BlobServerOptions.CLEANUP_INTERVAL) * 1000;
 
 		final Time finiteRegistrationDuration;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 04cb990..d628960 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -30,6 +30,7 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.fs.SafetyNetCloseableRegistry;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
@@ -203,7 +204,10 @@ public class Task implements Runnable, TaskActions {
 	/** All listener that want to be notified about changes in the task's execution state */
 	private final List<TaskExecutionStateListener> taskExecutionStateListeners;
 
-	/** The library cache, from which the task can request its required JAR files */
+	/** The BLOB cache, from which the task can request BLOB files */
+	private final BlobCache blobCache;
+
+	/** The library cache, from which the task can request its class loader */
 	private final LibraryCacheManager libraryCache;
 
 	/** The cache for user-defined files that the invokable requires */
@@ -282,6 +286,7 @@ public class Task implements Runnable, TaskActions {
 		TaskManagerActions taskManagerActions,
 		InputSplitProvider inputSplitProvider,
 		CheckpointResponder checkpointResponder,
+		BlobCache blobCache,
 		LibraryCacheManager libraryCache,
 		FileCache fileCache,
 		TaskManagerRuntimeInfo taskManagerConfig,
@@ -330,6 +335,7 @@ public class Task implements Runnable, TaskActions {
 		this.checkpointResponder = Preconditions.checkNotNull(checkpointResponder);
 		this.taskManagerActions = checkNotNull(taskManagerActions);
 
+		this.blobCache = Preconditions.checkNotNull(blobCache);
 		this.libraryCache = Preconditions.checkNotNull(libraryCache);
 		this.fileCache = Preconditions.checkNotNull(fileCache);
 		this.network = Preconditions.checkNotNull(networkEnvironment);
@@ -568,6 +574,8 @@ public class Task implements Runnable, TaskActions {
 			LOG.info("Creating FileSystem stream leak safety net for task {}", this);
 			FileSystemSafetyNet.initializeSafetyNetForThread();
 
+			blobCache.registerJob(jobId);
+
 			// first of all, get a user-code classloader
 			// this may involve downloading the job's JAR files and/or classes
 			LOG.info("Loading JAR files for task {}.", this);
@@ -827,6 +835,7 @@ public class Task implements Runnable, TaskActions {
 
 				// remove all of the tasks library resources
 				libraryCache.unregisterTask(jobId, executionId);
+				blobCache.releaseJob(jobId);
 
 				// remove all files in the distributed cache
 				removeCachedFiles(distributedCacheEntries, fileCache);
@@ -862,7 +871,7 @@ public class Task implements Runnable, TaskActions {
 		// triggers the download of all missing jar files from the job manager
 		libraryCache.registerTask(jobId, executionId, requiredJarFiles, requiredClasspaths);
 
-		LOG.debug("Register task {} at library cache manager took {} milliseconds",
+		LOG.debug("Getting user code class loader for task {} at library cache manager took {} milliseconds",
 				executionId, System.currentTimeMillis() - startDownloadTime);
 
 		ClassLoader userCodeClassLoader = libraryCache.getClassLoader(jobId);


[4/5] flink git commit: [FLINK-7057][blob] move ref-counting from the LibraryCacheManager to the BlobCache

Posted by tr...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
index cd7b363..61c61b4 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
@@ -18,11 +18,12 @@
 
 package org.apache.flink.runtime.clusterframework
 
-import java.util.concurrent.{ScheduledExecutorService, Executor}
+import java.util.concurrent.{Executor, ScheduledExecutorService}
 
 import akka.actor.ActorRef
 import org.apache.flink.api.common.JobID
 import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.blob.BlobServer
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
 import org.apache.flink.runtime.clusterframework.messages._
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
@@ -51,6 +52,7 @@ import scala.language.postfixOps
   * @param instanceManager Instance manager to manage the registered
   *                        [[org.apache.flink.runtime.taskmanager.TaskManager]]
   * @param scheduler Scheduler to schedule Flink jobs
+  * @param blobServer Server instance to store BLOBs for the individual tasks
   * @param libraryCacheManager Manager to manage uploaded jar files
   * @param archive Archive for finished Flink jobs
   * @param restartStrategyFactory Restart strategy to be used in case of a job recovery
@@ -63,6 +65,7 @@ abstract class ContaineredJobManager(
     ioExecutor: Executor,
     instanceManager: InstanceManager,
     scheduler: FlinkScheduler,
+    blobServer: BlobServer,
     libraryCacheManager: BlobLibraryCacheManager,
     archive: ActorRef,
     restartStrategyFactory: RestartStrategyFactory,
@@ -78,6 +81,7 @@ abstract class ContaineredJobManager(
     ioExecutor,
     instanceManager,
     scheduler,
+    blobServer,
     libraryCacheManager,
     archive,
     restartStrategyFactory,

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 1616a7b..8c551a7 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -126,6 +126,7 @@ class JobManager(
     protected val ioExecutor: Executor,
     protected val instanceManager: InstanceManager,
     protected val scheduler: FlinkScheduler,
+    protected val blobServer: BlobServer,
     protected val libraryCacheManager: BlobLibraryCacheManager,
     protected val archive: ActorRef,
     protected val restartStrategyFactory: RestartStrategyFactory,
@@ -272,11 +273,12 @@ class JobManager(
 
     instanceManager.shutdown()
     scheduler.shutdown()
+    libraryCacheManager.shutdown()
 
     try {
-      libraryCacheManager.shutdown()
+      blobServer.close()
     } catch {
-      case e: IOException => log.error("Could not properly shutdown the library cache manager.", e)
+      case e: IOException => log.error("Could not properly shutdown the blob server.", e)
     }
 
     // failsafe shutdown of the metrics registry
@@ -422,7 +424,7 @@ class JobManager(
         taskManager ! decorateMessage(
           AlreadyRegistered(
             instanceID,
-            libraryCacheManager.getBlobServerPort))
+            blobServer.getPort))
       } else {
         try {
           val actorGateway = new AkkaActorGateway(taskManager, leaderSessionID.orNull)
@@ -437,7 +439,7 @@ class JobManager(
           taskManagerMap.put(taskManager, instanceID)
 
           taskManager ! decorateMessage(
-            AcknowledgeRegistration(instanceID, libraryCacheManager.getBlobServerPort))
+            AcknowledgeRegistration(instanceID, blobServer.getPort))
 
           // to be notified when the taskManager is no longer reachable
           context.watch(taskManager)
@@ -839,6 +841,7 @@ class JobManager(
         try {
           log.info(s"Disposing savepoint at '$savepointPath'.")
           //TODO user code class loader ?
+          // (has not been used so far and new savepoints can simply be deleted by file)
           val savepoint = SavepointStore.loadSavepoint(
             savepointPath,
             Thread.currentThread().getContextClassLoader)
@@ -1060,7 +1063,7 @@ class JobManager(
         case Some((graph, jobInfo)) =>
           sender() ! decorateMessage(
             ClassloadingProps(
-              libraryCacheManager.getBlobServerPort,
+              blobServer.getPort,
               graph.getRequiredJarFiles,
               graph.getRequiredClasspaths))
         case None =>
@@ -1068,7 +1071,7 @@ class JobManager(
       }
 
     case RequestBlobManagerPort =>
-      sender ! decorateMessage(libraryCacheManager.getBlobServerPort)
+      sender ! decorateMessage(blobServer.getPort)
 
     case RequestArchive =>
       sender ! decorateMessage(ResponseArchive(archive))
@@ -1254,8 +1257,8 @@ class JobManager(
         // because this makes sure that the uploaded jar files are removed in case of
         // unsuccessful
         try {
-          libraryCacheManager.registerJob(jobGraph.getJobID, jobGraph.getUserJarBlobKeys,
-            jobGraph.getClasspaths)
+          libraryCacheManager.registerJob(
+            jobGraph.getJobID, jobGraph.getUserJarBlobKeys, jobGraph.getClasspaths)
         }
         catch {
           case t: Throwable =>
@@ -1344,6 +1347,7 @@ class JobManager(
           log.error(s"Failed to submit job $jobId ($jobName)", t)
 
           libraryCacheManager.unregisterJob(jobId)
+          blobServer.cleanupJob(jobId)
           currentJobs.remove(jobId)
 
           if (executionGraph != null) {
@@ -1785,12 +1789,10 @@ class JobManager(
       case None => None
     }
 
-    try {
-      libraryCacheManager.unregisterJob(jobID)
-    } catch {
-      case t: Throwable =>
-        log.error(s"Could not properly unregister job $jobID from the library cache.", t)
-    }
+    // remove all job-related BLOBs from local and HA store
+    libraryCacheManager.unregisterJob(jobID)
+    blobServer.cleanupJob(jobID)
+
     jobManagerMetricGroup.foreach(_.removeJob(jobID))
 
     futureOption
@@ -2463,6 +2465,7 @@ object JobManager {
       blobStore: BlobStore) :
     (InstanceManager,
     FlinkScheduler,
+    BlobServer,
     BlobLibraryCacheManager,
     RestartStrategyFactory,
     FiniteDuration, // timeout
@@ -2474,10 +2477,6 @@ object JobManager {
 
     val timeout: FiniteDuration = AkkaUtils.getTimeout(configuration)
 
-    val cleanupInterval = configuration.getLong(
-      ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL,
-      ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000
-
     val restartStrategy = RestartStrategyFactory.createRestartStrategyFactory(configuration)
 
     val archiveCount = configuration.getInteger(WebOptions.ARCHIVE_COUNT)
@@ -2508,21 +2507,21 @@ object JobManager {
       blobServer = new BlobServer(configuration, blobStore)
       instanceManager = new InstanceManager()
       scheduler = new FlinkScheduler(ExecutionContext.fromExecutor(futureExecutor))
-      libraryCacheManager = new BlobLibraryCacheManager(blobServer, cleanupInterval)
+      libraryCacheManager = new BlobLibraryCacheManager(blobServer)
 
       instanceManager.addInstanceListener(scheduler)
     }
     catch {
       case t: Throwable =>
-        if (libraryCacheManager != null) {
-          libraryCacheManager.shutdown()
-        }
         if (scheduler != null) {
           scheduler.shutdown()
         }
         if (instanceManager != null) {
           instanceManager.shutdown()
         }
+        if (libraryCacheManager != null) {
+          libraryCacheManager.shutdown()
+        }
         if (blobServer != null) {
           blobServer.close()
         }
@@ -2554,6 +2553,7 @@ object JobManager {
 
     (instanceManager,
       scheduler,
+      blobServer,
       libraryCacheManager,
       restartStrategy,
       timeout,
@@ -2627,6 +2627,7 @@ object JobManager {
 
     val (instanceManager,
     scheduler,
+    blobServer,
     libraryCacheManager,
     restartStrategy,
     timeout,
@@ -2654,6 +2655,7 @@ object JobManager {
       ioExecutor,
       instanceManager,
       scheduler,
+      blobServer,
       libraryCacheManager,
       archive,
       restartStrategy,
@@ -2693,6 +2695,7 @@ object JobManager {
     ioExecutor: Executor,
     instanceManager: InstanceManager,
     scheduler: FlinkScheduler,
+    blobServer: BlobServer,
     libraryCacheManager: LibraryCacheManager,
     archive: ActorRef,
     restartStrategyFactory: RestartStrategyFactory,
@@ -2710,6 +2713,7 @@ object JobManager {
       ioExecutor,
       instanceManager,
       scheduler,
+      blobServer,
       libraryCacheManager,
       archive,
       restartStrategyFactory,

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index 0ae00a9..dcf9dd0 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.JobID
 import org.apache.flink.api.common.io.FileOutputFormat
 import org.apache.flink.configuration.{ConfigConstants, Configuration, JobManagerOptions, QueryableStateOptions, ResourceManagerOptions, TaskManagerOptions}
 import org.apache.flink.core.fs.Path
+import org.apache.flink.runtime.blob.BlobServer
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager
 import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager
@@ -133,6 +134,7 @@ class LocalFlinkMiniCluster(
 
     val (instanceManager,
     scheduler,
+    blobServer,
     libraryCacheManager,
     restartStrategyFactory,
     timeout,
@@ -164,6 +166,7 @@ class LocalFlinkMiniCluster(
         ioExecutor,
         instanceManager,
         scheduler,
+        blobServer,
         libraryCacheManager,
         archive,
         restartStrategyFactory,
@@ -279,6 +282,7 @@ class LocalFlinkMiniCluster(
       ioExecutor: Executor,
       instanceManager: InstanceManager,
       scheduler: Scheduler,
+      blobServer: BlobServer,
       libraryCacheManager: BlobLibraryCacheManager,
       archive: ActorRef,
       restartStrategyFactory: RestartStrategyFactory,
@@ -297,6 +301,7 @@ class LocalFlinkMiniCluster(
       ioExecutor,
       instanceManager,
       scheduler,
+      blobServer,
       libraryCacheManager,
       archive,
       restartStrategyFactory,

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 0c419eb..431adb6 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -35,7 +35,7 @@ import org.apache.flink.configuration._
 import org.apache.flink.core.fs.FileSystem
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
 import org.apache.flink.runtime.akka.{AkkaUtils, DefaultQuarantineHandler, QuarantineMonitor}
-import org.apache.flink.runtime.blob.{BlobCache, BlobClient, BlobService}
+import org.apache.flink.runtime.blob.{BlobCache, BlobClient}
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager
 import org.apache.flink.runtime.clusterframework.messages.StopCluster
 import org.apache.flink.runtime.clusterframework.types.ResourceID
@@ -160,7 +160,7 @@ class TaskManager(
     * registered at the job manager */
   private val waitForRegistration = scala.collection.mutable.Set[ActorRef]()
 
-  private var blobService: Option[BlobService] = None
+  private var blobCache: Option[BlobCache] = None
   private var libraryCacheManager: Option[LibraryCacheManager] = None
 
   /* The current leading JobManager Actor associated with */
@@ -333,11 +333,11 @@ class TaskManager(
       killTaskManagerFatal(message, cause)
 
     case RequestTaskManagerLog(requestType : LogTypeRequest) =>
-      blobService match {
+      blobCache match {
         case Some(_) =>
           handleRequestTaskManagerLog(sender(), requestType, currentJobManager.get)
         case None =>
-          sender() ! akka.actor.Status.Failure(new IOException("BlobService not " +
+          sender() ! akka.actor.Status.Failure(new IOException("BlobCache not " +
             "available. Cannot upload TaskManager logs."))
       }
 
@@ -840,7 +840,7 @@ class TaskManager(
         if (file.exists()) {
           val fis = new FileInputStream(file);
           Future {
-            val client: BlobClient = blobService.get.createClient()
+            val client: BlobClient = blobCache.get.createClient()
             client.put(fis);
           }(context.dispatcher)
             .onComplete {
@@ -915,7 +915,7 @@ class TaskManager(
       "starting network stack and library cache.")
 
     // sanity check that the JobManager dependent components are not set up currently
-    if (connectionUtils.isDefined || blobService.isDefined) {
+    if (connectionUtils.isDefined || blobCache.isDefined) {
       throw new IllegalStateException("JobManager-specific components are already initialized.")
     }
 
@@ -968,9 +968,9 @@ class TaskManager(
           address,
           config.getConfiguration(),
           highAvailabilityServices.createBlobStore())
-        blobService = Option(blobcache)
+        blobCache = Option(blobcache)
         libraryCacheManager = Some(
-          new BlobLibraryCacheManager(blobcache, config.getCleanupInterval()))
+          new BlobLibraryCacheManager(blobcache))
       }
       catch {
         case e: Exception =>
@@ -1047,18 +1047,11 @@ class TaskManager(
 
     // shut down BLOB and library cache
     libraryCacheManager foreach {
-      manager =>
-        try {
-          manager.shutdown()
-        } catch {
-          case ioe: IOException => log.error(
-            "Could not properly shutdown library cache manager.",
-            ioe)
-        }
+      manager => manager.shutdown()
     }
     libraryCacheManager = None
 
-    blobService foreach {
+    blobCache foreach {
       service =>
         try {
           service.close()
@@ -1066,7 +1059,7 @@ class TaskManager(
           case ioe: IOException => log.error("Could not properly shutdown blob service.", ioe)
         }
     }
-    blobService = None
+    blobCache = None
 
     // disassociate the slot environment
     connectionUtils = None
@@ -1130,6 +1123,10 @@ class TaskManager(
         case Some(manager) => manager
         case None => throw new IllegalStateException("There is no valid library cache manager.")
       }
+      val blobCache = this.blobCache match {
+        case Some(manager) => manager
+        case None => throw new IllegalStateException("There is no valid BLOB cache.")
+      }
 
       val slot = tdd.getTargetSlotNumber
       if (slot < 0 || slot >= numberOfSlots) {
@@ -1200,6 +1197,7 @@ class TaskManager(
         taskManagerConnection,
         inputSplitProvider,
         checkpointResponder,
+        blobCache,
         libCache,
         fileCache,
         config,

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java
new file mode 100644
index 0000000..afd365b
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * A few tests for the deferred ref-counting based cleanup inside the {@link BlobCache}.
+ */
+public class BlobCacheCleanupTest extends TestLogger {
+
+	@Rule
+	public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+	/**
+	 * Tests that {@link BlobCache} cleans up after calling {@link BlobCache#releaseJob(JobID)}.
+	 */
+	@Test
+	public void testJobCleanup() throws IOException, InterruptedException {
+
+		JobID jobId = new JobID();
+		List<BlobKey> keys = new ArrayList<BlobKey>();
+		BlobServer server = null;
+		BlobCache cache = null;
+
+		final byte[] buf = new byte[128];
+
+		try {
+			Configuration config = new Configuration();
+			config.setString(BlobServerOptions.STORAGE_DIRECTORY,
+				temporaryFolder.newFolder().getAbsolutePath());
+			config.setLong(BlobServerOptions.CLEANUP_INTERVAL, 1L);
+
+			server = new BlobServer(config, new VoidBlobStore());
+			InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
+
+			// upload blobs
+			try (BlobClient bc = new BlobClient(serverAddress, config)) {
+				keys.add(bc.put(jobId, buf));
+				buf[0] += 1;
+				keys.add(bc.put(jobId, buf));
+			}
+
+			cache = new BlobCache(serverAddress, config, new VoidBlobStore());
+
+			checkFileCountForJob(2, jobId, server);
+			checkFileCountForJob(0, jobId, cache);
+
+			// register once
+			cache.registerJob(jobId);
+
+			checkFileCountForJob(2, jobId, server);
+			checkFileCountForJob(0, jobId, cache);
+
+			for (BlobKey key : keys) {
+				cache.getFile(jobId, key);
+			}
+
+			// register again (let's say, from another thread or so)
+			cache.registerJob(jobId);
+			for (BlobKey key : keys) {
+				cache.getFile(jobId, key);
+			}
+
+			assertEquals(2, checkFilesExist(jobId, keys, cache, true));
+			checkFileCountForJob(2, jobId, server);
+			checkFileCountForJob(2, jobId, cache);
+
+			// after releasing once, nothing should change
+			cache.releaseJob(jobId);
+
+			assertEquals(2, checkFilesExist(jobId, keys, cache, true));
+			checkFileCountForJob(2, jobId, server);
+			checkFileCountForJob(2, jobId, cache);
+
+			// after releasing the second time, the job is up for deferred cleanup
+			cache.releaseJob(jobId);
+
+			// because we cannot guarantee that there are not thread races in the build system, we
+			// loop for a certain while until the references disappear
+			{
+				long deadline = System.currentTimeMillis() + 30_000L;
+				do {
+					Thread.sleep(100);
+				}
+				while (checkFilesExist(jobId, keys, cache, false) != 0 &&
+					System.currentTimeMillis() < deadline);
+			}
+
+			// the blob cache should no longer contain the files
+			// this fails if we exited via a timeout
+			checkFileCountForJob(0, jobId, cache);
+			// server should be unaffected
+			checkFileCountForJob(2, jobId, server);
+		}
+		finally {
+			if (cache != null) {
+				cache.close();
+			}
+
+			if (server != null) {
+				server.close();
+			}
+			// now everything should be cleaned up
+			checkFileCountForJob(0, jobId, server);
+		}
+	}
+
+	/**
+	 * Tests that {@link BlobCache} cleans up after calling {@link BlobCache#releaseJob(JobID)}
+	 * but only after preserving the file for a bit longer.
+	 */
+	@Test
+	@Ignore("manual test due to stalling: ensures a BLOB is retained first and only deleted after the (long) timeout ")
+	public void testJobDeferredCleanup() throws IOException, InterruptedException {
+		// file should be deleted between 5 and 10s after last job release
+		long cleanupInterval = 5L;
+
+		JobID jobId = new JobID();
+		List<BlobKey> keys = new ArrayList<BlobKey>();
+		BlobServer server = null;
+		BlobCache cache = null;
+
+		final byte[] buf = new byte[128];
+
+		try {
+			Configuration config = new Configuration();
+			config.setString(BlobServerOptions.STORAGE_DIRECTORY,
+				temporaryFolder.newFolder().getAbsolutePath());
+			config.setLong(BlobServerOptions.CLEANUP_INTERVAL, cleanupInterval);
+
+			server = new BlobServer(config, new VoidBlobStore());
+			InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
+
+			// upload blobs
+			try (BlobClient bc = new BlobClient(serverAddress, config)) {
+				keys.add(bc.put(jobId, buf));
+				buf[0] += 1;
+				keys.add(bc.put(jobId, buf));
+			}
+
+			cache = new BlobCache(serverAddress, config, new VoidBlobStore());
+
+			checkFileCountForJob(2, jobId, server);
+			checkFileCountForJob(0, jobId, cache);
+
+			// register once
+			cache.registerJob(jobId);
+
+			checkFileCountForJob(2, jobId, server);
+			checkFileCountForJob(0, jobId, cache);
+
+			for (BlobKey key : keys) {
+				cache.getFile(jobId, key);
+			}
+
+			// register again (let's say, from another thread or so)
+			cache.registerJob(jobId);
+			for (BlobKey key : keys) {
+				cache.getFile(jobId, key);
+			}
+
+			assertEquals(2, checkFilesExist(jobId, keys, cache, true));
+			checkFileCountForJob(2, jobId, server);
+			checkFileCountForJob(2, jobId, cache);
+
+			// after releasing once, nothing should change
+			cache.releaseJob(jobId);
+
+			assertEquals(2, checkFilesExist(jobId, keys, cache, true));
+			checkFileCountForJob(2, jobId, server);
+			checkFileCountForJob(2, jobId, cache);
+
+			// after releasing the second time, the job is up for deferred cleanup
+			cache.releaseJob(jobId);
+
+			// files should still be accessible for now
+			assertEquals(2, checkFilesExist(jobId, keys, cache, true));
+			checkFileCountForJob(2, jobId, cache);
+
+			Thread.sleep(cleanupInterval / 5);
+			// still accessible...
+			assertEquals(2, checkFilesExist(jobId, keys, cache, true));
+			checkFileCountForJob(2, jobId, cache);
+
+			Thread.sleep((cleanupInterval * 4) / 5);
+
+			// files are up for cleanup now...wait for it:
+			// because we cannot guarantee that there are not thread races in the build system, we
+			// loop for a certain while until the references disappear
+			{
+				long deadline = System.currentTimeMillis() + 30_000L;
+				do {
+					Thread.sleep(100);
+				}
+				while (checkFilesExist(jobId, keys, cache, false) != 0 &&
+					System.currentTimeMillis() < deadline);
+			}
+
+			// the blob cache should no longer contain the files
+			// this fails if we exited via a timeout
+			checkFileCountForJob(0, jobId, cache);
+			// server should be unaffected
+			checkFileCountForJob(2, jobId, server);
+		}
+		finally {
+			if (cache != null) {
+				cache.close();
+			}
+
+			if (server != null) {
+				server.close();
+			}
+			// now everything should be cleaned up
+			checkFileCountForJob(0, jobId, server);
+		}
+	}
+
+	/**
+	 * Checks how many of the files given by blob keys are accessible.
+	 *
+	 * @param jobId
+	 * 		ID of a job
+	 * @param keys
+	 * 		blob keys to check
+	 * @param blobService
+	 * 		BLOB store to use
+	 * @param doThrow
+	 * 		whether exceptions should be ignored (<tt>false</tt>), or thrown (<tt>true</tt>)
+	 *
+	 * @return number of files we were able to retrieve via {@link BlobService#getFile}
+	 */
+	public static int checkFilesExist(
+		JobID jobId, Collection<BlobKey> keys, BlobService blobService, boolean doThrow)
+		throws IOException {
+
+		int numFiles = 0;
+
+		for (BlobKey key : keys) {
+			final File blobFile;
+			if (blobService instanceof BlobServer) {
+				BlobServer server = (BlobServer) blobService;
+				blobFile = server.getStorageLocation(jobId, key);
+			} else {
+				BlobCache cache = (BlobCache) blobService;
+				blobFile = cache.getStorageLocation(jobId, key);
+			}
+			if (blobFile.exists()) {
+				++numFiles;
+			} else if (doThrow) {
+				throw new IOException("File " + blobFile + " does not exist.");
+			}
+		}
+
+		return numFiles;
+	}
+
+	/**
+	 * Checks how many of the files given by blob keys are accessible.
+	 *
+	 * @param expectedCount
+	 * 		number of expected files in the blob service for the given job
+	 * @param jobId
+	 * 		ID of a job
+	 * @param blobService
+	 * 		BLOB store to use
+	 *
+	 * @return number of files we were able to retrieve via {@link BlobService#getFile}
+	 */
+	public static void checkFileCountForJob(
+		int expectedCount, JobID jobId, BlobService blobService)
+		throws IOException {
+
+		final File jobDir;
+		if (blobService instanceof BlobServer) {
+			BlobServer server = (BlobServer) blobService;
+			jobDir = server.getStorageLocation(jobId, new BlobKey()).getParentFile();
+		} else {
+			BlobCache cache = (BlobCache) blobService;
+			jobDir = cache.getStorageLocation(jobId, new BlobKey()).getParentFile();
+		}
+		File[] blobsForJob = jobDir.listFiles();
+		if (blobsForJob == null) {
+			if (expectedCount != 0) {
+				throw new IOException("File " + jobDir + " does not exist.");
+			}
+		} else {
+			assertEquals("Too many/few files in job dir: " +
+					Arrays.asList(blobsForJob).toString(), expectedCount,
+				blobsForJob.length);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
index 8c575a9..0060ccb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
@@ -22,6 +22,8 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.util.TestLogger;
+
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -37,7 +39,7 @@ import static org.junit.Assert.*;
 /**
  * Unit tests for the blob cache retrying the connection to the server.
  */
-public class BlobCacheRetriesTest {
+public class BlobCacheRetriesTest extends TestLogger {
 
 	@Rule
 	public TemporaryFolder temporaryFolder = new TemporaryFolder();

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
index d511e86..6d6bfd5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
@@ -39,6 +39,8 @@ import java.util.Collections;
 import java.util.List;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.util.TestLogger;
+
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
@@ -46,7 +48,7 @@ import static org.junit.Assert.fail;
 /**
  * This class contains unit tests for the {@link BlobClient}.
  */
-public class BlobClientTest {
+public class BlobClientTest extends TestLogger {
 
 	/** The buffer size used during the tests in bytes. */
 	private static final int TEST_BUFFER_SIZE = 17 * 1000;
@@ -214,7 +216,7 @@ public class BlobClientTest {
 	 * Tests the PUT/GET operations for content-addressable buffers.
 	 */
 	@Test
-	public void testContentAddressableBuffer() {
+	public void testContentAddressableBuffer() throws IOException {
 
 		BlobClient client = null;
 
@@ -256,10 +258,6 @@ public class BlobClientTest {
 				// expected
 			}
 		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
 		finally {
 			if (client != null) {
 				try {
@@ -281,7 +279,7 @@ public class BlobClientTest {
 	 * Tests the PUT/GET operations for content-addressable streams.
 	 */
 	@Test
-	public void testContentAddressableStream() {
+	public void testContentAddressableStream() throws IOException {
 
 		BlobClient client = null;
 		InputStream is = null;
@@ -313,10 +311,6 @@ public class BlobClientTest {
 			validateGetAndClose(client.get(receivedKey), testFile);
 			validateGetAndClose(client.get(jobId, receivedKey), testFile);
 		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
 		finally {
 			if (is != null) {
 				try {
@@ -332,7 +326,7 @@ public class BlobClientTest {
 	}
 
 	/**
-	 * Tests the static {@link BlobClient#uploadJarFiles(InetSocketAddress, Configuration, List)} helper.
+	 * Tests the static {@link BlobClient#uploadJarFiles(InetSocketAddress, Configuration, JobID, List)} helper.
 	 */
 	@Test
 	public void testUploadJarFilesHelper() throws Exception {
@@ -340,7 +334,7 @@ public class BlobClientTest {
 	}
 
 	/**
-	 * Tests the static {@link BlobClient#uploadJarFiles(InetSocketAddress, Configuration, List)} helper.
+	 * Tests the static {@link BlobClient#uploadJarFiles(InetSocketAddress, Configuration, JobID, List)}} helper.
 	 */
 	static void uploadJarFile(BlobServer blobServer, Configuration blobClientConfig) throws Exception {
 		final File testFile = File.createTempFile("testfile", ".dat");
@@ -354,15 +348,16 @@ public class BlobClientTest {
 	}
 
 	private static void uploadJarFile(
-		final InetSocketAddress serverAddress, final Configuration blobClientConfig,
-		final File testFile) throws IOException {
+			final InetSocketAddress serverAddress, final Configuration blobClientConfig,
+			final File testFile) throws IOException {
+		JobID jobId = new JobID();
 		List<BlobKey> blobKeys = BlobClient.uploadJarFiles(serverAddress, blobClientConfig,
-			Collections.singletonList(new Path(testFile.toURI())));
+			jobId, Collections.singletonList(new Path(testFile.toURI())));
 
 		assertEquals(1, blobKeys.size());
 
 		try (BlobClient blobClient = new BlobClient(serverAddress, blobClientConfig)) {
-			validateGetAndClose(blobClient.get(blobKeys.get(0)), testFile);
+			validateGetAndClose(blobClient.get(jobId, blobKeys.get(0)), testFile);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java
index 4071a1c..43bc622 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java
@@ -29,12 +29,14 @@ import java.io.IOException;
 
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.util.StringUtils;
+import org.apache.flink.util.TestLogger;
+
 import org.junit.Test;
 
 /**
  * This class contains unit tests for the {@link BlobKey} class.
  */
-public final class BlobKeyTest {
+public final class BlobKeyTest extends TestLogger {
 	/**
 	 * The first key array to be used during the unit tests.
 	 */
@@ -106,4 +108,4 @@ public final class BlobKeyTest {
 
 		assertEquals(k1, k2);
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
index 413e2e9..6bb5ab5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
@@ -41,6 +41,8 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
+import static org.apache.flink.runtime.blob.BlobCacheCleanupTest.checkFileCountForJob;
+import static org.apache.flink.runtime.blob.BlobCacheCleanupTest.checkFilesExist;
 import static org.apache.flink.runtime.blob.BlobClientTest.validateGetAndClose;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -62,7 +64,7 @@ public class BlobServerDeleteTest extends TestLogger {
 	public TemporaryFolder temporaryFolder = new TemporaryFolder();
 
 	@Test
-	public void testDeleteSingleByBlobKey() {
+	public void testDeleteSingleByBlobKey() throws IOException {
 		BlobServer server = null;
 		BlobClient client = null;
 		BlobStore blobStore = new VoidBlobStore();
@@ -131,10 +133,6 @@ public class BlobServerDeleteTest extends TestLogger {
 				// expected
 			}
 		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
 		finally {
 			cleanup(server, client);
 		}
@@ -153,16 +151,16 @@ public class BlobServerDeleteTest extends TestLogger {
 	}
 
 	@Test
-	public void testDeleteAlreadyDeletedNoJob() {
+	public void testDeleteAlreadyDeletedNoJob() throws IOException {
 		testDeleteAlreadyDeleted(null);
 	}
 
 	@Test
-	public void testDeleteAlreadyDeletedForJob() {
+	public void testDeleteAlreadyDeletedForJob() throws IOException {
 		testDeleteAlreadyDeleted(new JobID());
 	}
 
-	private void testDeleteAlreadyDeleted(final JobID jobId) {
+	private void testDeleteAlreadyDeleted(final JobID jobId) throws IOException {
 		BlobServer server = null;
 		BlobClient client = null;
 		BlobStore blobStore = new VoidBlobStore();
@@ -201,10 +199,6 @@ public class BlobServerDeleteTest extends TestLogger {
 				server.delete(jobId, key);
 			}
 		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
 		finally {
 			cleanup(server, client);
 		}
@@ -219,16 +213,16 @@ public class BlobServerDeleteTest extends TestLogger {
 	}
 
 	@Test
-	public void testDeleteFailsNoJob() {
+	public void testDeleteFailsNoJob() throws IOException {
 		testDeleteFails(null);
 	}
 
 	@Test
-	public void testDeleteFailsForJob() {
+	public void testDeleteFailsForJob() throws IOException {
 		testDeleteFails(new JobID());
 	}
 
-	private void testDeleteFails(final JobID jobId) {
+	private void testDeleteFails(final JobID jobId) throws IOException {
 		assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't work on Windows.
 
 		BlobServer server = null;
@@ -275,9 +269,6 @@ public class BlobServerDeleteTest extends TestLogger {
 			} else {
 				server.getFile(jobId, key);
 			}
-		} catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
 		} finally {
 			if (blobFile != null && directory != null) {
 				//noinspection ResultOfMethodCallIgnored
@@ -290,6 +281,64 @@ public class BlobServerDeleteTest extends TestLogger {
 	}
 
 	/**
+	 * Tests that {@link BlobServer} cleans up after calling {@link BlobServer#cleanupJob(JobID)}.
+	 */
+	@Test
+	public void testJobCleanup() throws IOException, InterruptedException {
+
+		JobID jobId1 = new JobID();
+		List<BlobKey> keys1 = new ArrayList<BlobKey>();
+		JobID jobId2 = new JobID();
+		List<BlobKey> keys2 = new ArrayList<BlobKey>();
+		BlobServer server = null;
+
+		final byte[] buf = new byte[128];
+
+		try {
+			Configuration config = new Configuration();
+			config.setString(BlobServerOptions.STORAGE_DIRECTORY,
+				temporaryFolder.newFolder().getAbsolutePath());
+
+			server = new BlobServer(config, new VoidBlobStore());
+			InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
+			BlobClient bc = new BlobClient(serverAddress, config);
+
+			keys1.add(bc.put(jobId1, buf));
+			keys2.add(bc.put(jobId2, buf));
+			assertEquals(keys2.get(0), keys1.get(0));
+
+			buf[0] += 1;
+			keys1.add(bc.put(jobId1, buf));
+
+			bc.close();
+
+			assertEquals(2, checkFilesExist(jobId1, keys1, server, true));
+			checkFileCountForJob(2, jobId1, server);
+			assertEquals(1, checkFilesExist(jobId2, keys2, server, true));
+			checkFileCountForJob(1, jobId2, server);
+
+			server.cleanupJob(jobId1);
+
+			checkFileCountForJob(0, jobId1, server);
+			assertEquals(1, checkFilesExist(jobId2, keys2, server, true));
+			checkFileCountForJob(1, jobId2, server);
+
+			server.cleanupJob(jobId2);
+
+			checkFileCountForJob(0, jobId1, server);
+			checkFileCountForJob(0, jobId2, server);
+
+			// calling a second time should not fail
+			server.cleanupJob(jobId2);
+		}
+		finally {
+			if (server != null) {
+				server.close();
+			}
+		}
+	}
+
+	/**
 	 * FLINK-6020
 	 *
 	 * Tests that concurrent delete operations don't interfere with each other.

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java
index e449aab..a6ac447 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.blob;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.TestLogger;
 
 import org.junit.After;
 import org.junit.Before;
@@ -34,7 +35,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assume.assumeTrue;
 import static org.mockito.Mockito.mock;
 
-public class BlobUtilsTest {
+public class BlobUtilsTest extends TestLogger {
 
 	private final static String CANNOT_CREATE_THIS = "cannot-create-this";
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
index ec1bbd8..c58e3a0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 
+import org.apache.flink.runtime.testtasks.FailingBlockingInvokable;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
@@ -190,26 +191,4 @@ public class CoordinatorShutdownTest extends TestLogger {
 		}
 	}
 
-	public static class FailingBlockingInvokable extends AbstractInvokable {
-		private static boolean blocking = true;
-		private static final Object lock = new Object();
-
-		@Override
-		public void invoke() throws Exception {
-			while (blocking) {
-				synchronized (lock) {
-					lock.wait();
-				}
-			}
-			throw new RuntimeException("This exception is expected.");
-		}
-
-		public static void unblock() {
-			blocking = false;
-
-			synchronized (lock) {
-				lock.notifyAll();
-			}
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index 3814684..4237327 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobServer;
-import org.apache.flink.runtime.blob.BlobService;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -137,7 +136,7 @@ public class DispatcherTest extends TestLogger {
 				Configuration configuration,
 				RpcService rpcService,
 				HighAvailabilityServices highAvailabilityServices,
-				BlobService blobService,
+				BlobServer blobServer,
 				HeartbeatServices heartbeatServices,
 				MetricRegistry metricRegistry,
 				OnCompletionActions onCompleteActions,

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
index b43a307..a4b48e8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
@@ -18,24 +18,22 @@
 
 package org.apache.flink.runtime.execution.librarycache;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.blob.BlobClient;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.blob.BlobServer;
-import org.apache.flink.runtime.blob.BlobService;
 import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.TestLogger;
+
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
-import static org.junit.Assert.*;
-import static org.junit.Assume.assumeTrue;
-
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -45,7 +43,19 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 
-public class BlobLibraryCacheManagerTest {
+import static org.apache.flink.runtime.blob.BlobCacheCleanupTest.checkFileCountForJob;
+import static org.apache.flink.runtime.blob.BlobCacheCleanupTest.checkFilesExist;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeTrue;
+
+/**
+ * Tests for {@link BlobLibraryCacheManager}.
+ */
+public class BlobLibraryCacheManagerTest extends TestLogger {
 
 	@Rule
 	public TemporaryFolder temporaryFolder = new TemporaryFolder();
@@ -57,10 +67,13 @@ public class BlobLibraryCacheManagerTest {
 	@Test
 	public void testLibraryCacheManagerJobCleanup() throws IOException, InterruptedException {
 
-		JobID jid = new JobID();
-		List<BlobKey> keys = new ArrayList<BlobKey>();
+		JobID jobId1 = new JobID();
+		JobID jobId2 = new JobID();
+		List<BlobKey> keys1 = new ArrayList<>();
+		List<BlobKey> keys2 = new ArrayList<>();
 		BlobServer server = null;
-		BlobLibraryCacheManager libraryCacheManager = null;
+		BlobCache cache = null;
+		BlobLibraryCacheManager libCache = null;
 
 		final byte[] buf = new byte[128];
 
@@ -68,122 +81,231 @@ public class BlobLibraryCacheManagerTest {
 			Configuration config = new Configuration();
 			config.setString(BlobServerOptions.STORAGE_DIRECTORY,
 				temporaryFolder.newFolder().getAbsolutePath());
+			config.setLong(BlobServerOptions.CLEANUP_INTERVAL, 1L);
 
 			server = new BlobServer(config, new VoidBlobStore());
-			InetSocketAddress blobSocketAddress = new InetSocketAddress(server.getPort());
-			BlobClient bc = new BlobClient(blobSocketAddress, config);
-
-			// TODO: make use of job-related BLOBs after adapting the BlobLibraryCacheManager
-			JobID jobId = null;
+			InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
+			BlobClient bc = new BlobClient(serverAddress, config);
+			cache = new BlobCache(serverAddress, config, new VoidBlobStore());
 
-			keys.add(bc.put(jobId, buf));
+			keys1.add(bc.put(jobId1, buf));
 			buf[0] += 1;
-			keys.add(bc.put(jobId, buf));
+			keys1.add(bc.put(jobId1, buf));
+			keys2.add(bc.put(jobId2, buf));
 
 			bc.close();
 
-			long cleanupInterval = 1000L;
-			libraryCacheManager = new BlobLibraryCacheManager(server, cleanupInterval);
-			libraryCacheManager.registerJob(jid, keys, Collections.<URL>emptyList());
+			libCache = new BlobLibraryCacheManager(cache);
+			cache.registerJob(jobId1);
+			cache.registerJob(jobId2);
+
+			assertEquals(0, libCache.getNumberOfManagedJobs());
+			assertEquals(0, libCache.getNumberOfReferenceHolders(jobId1));
+			checkFileCountForJob(2, jobId1, server);
+			checkFileCountForJob(0, jobId1, cache);
+			checkFileCountForJob(1, jobId2, server);
+			checkFileCountForJob(0, jobId2, cache);
+
+			libCache.registerJob(jobId1, keys1, Collections.<URL>emptyList());
+			ClassLoader classLoader1 = libCache.getClassLoader(jobId1);
+
+			assertEquals(1, libCache.getNumberOfManagedJobs());
+			assertEquals(1, libCache.getNumberOfReferenceHolders(jobId1));
+			assertEquals(0, libCache.getNumberOfReferenceHolders(jobId2));
+			assertEquals(2, checkFilesExist(jobId1, keys1, cache, true));
+			checkFileCountForJob(2, jobId1, server);
+			checkFileCountForJob(2, jobId1, cache);
+			assertEquals(0, checkFilesExist(jobId2, keys2, cache, false));
+			checkFileCountForJob(1, jobId2, server);
+			checkFileCountForJob(0, jobId2, cache);
+
+			libCache.registerJob(jobId2, keys2, Collections.<URL>emptyList());
+			ClassLoader classLoader2 = libCache.getClassLoader(jobId2);
+			assertNotEquals(classLoader1, classLoader2);
 
-			assertEquals(2, checkFilesExist(jobId, keys, server, true));
-			assertEquals(2, libraryCacheManager.getNumberOfCachedLibraries());
-			assertEquals(1, libraryCacheManager.getNumberOfReferenceHolders(jid));
-
-			libraryCacheManager.unregisterJob(jid);
-
-			// because we cannot guarantee that there are not thread races in the build system, we
-			// loop for a certain while until the references disappear
-			{
-				long deadline = System.currentTimeMillis() + 30000;
-				do {
-					Thread.sleep(500);
-				}
-				while (libraryCacheManager.getNumberOfCachedLibraries() > 0 &&
-					System.currentTimeMillis() < deadline);
+			try {
+				libCache.registerJob(jobId2, keys1, Collections.<URL>emptyList());
+				fail("Should fail with an IllegalStateException");
+			}
+			catch (IllegalStateException e) {
+				// that's what we want
 			}
-
-			// this fails if we exited via a timeout
-			assertEquals(0, libraryCacheManager.getNumberOfCachedLibraries());
-			assertEquals(0, libraryCacheManager.getNumberOfReferenceHolders(jid));
-
-			// the blob cache should no longer contain the files
-			assertEquals(0, checkFilesExist(jobId, keys, server, false));
 
 			try {
-				if (jobId == null) {
-					server.getFile(keys.get(0));
-				} else {
-					server.getFile(jobId, keys.get(0));
-				}
-				fail("BLOB should have been deleted");
-			} catch (IOException e) {
-				// expected
+				libCache.registerJob(
+					jobId2, keys2,
+					Collections.singletonList(new URL("file:///tmp/does-not-exist")));
+				fail("Should fail with an IllegalStateException");
 			}
-			try {
-				if (jobId == null) {
-					server.getFile(keys.get(1));
-				} else {
-					server.getFile(jobId, keys.get(1));
-				}
-				fail("BLOB should have been deleted");
-			} catch (IOException e) {
-				// expected
+			catch (IllegalStateException e) {
+				// that's what we want
 			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
+
+			assertEquals(2, libCache.getNumberOfManagedJobs());
+			assertEquals(1, libCache.getNumberOfReferenceHolders(jobId1));
+			assertEquals(1, libCache.getNumberOfReferenceHolders(jobId2));
+			assertEquals(2, checkFilesExist(jobId1, keys1, cache, true));
+			checkFileCountForJob(2, jobId1, server);
+			checkFileCountForJob(2, jobId1, cache);
+			assertEquals(1, checkFilesExist(jobId2, keys2, cache, true));
+			checkFileCountForJob(1, jobId2, server);
+			checkFileCountForJob(1, jobId2, cache);
+
+			libCache.unregisterJob(jobId1);
+
+			assertEquals(1, libCache.getNumberOfManagedJobs());
+			assertEquals(0, libCache.getNumberOfReferenceHolders(jobId1));
+			assertEquals(1, libCache.getNumberOfReferenceHolders(jobId2));
+			assertEquals(2, checkFilesExist(jobId1, keys1, cache, true));
+			checkFileCountForJob(2, jobId1, server);
+			checkFileCountForJob(2, jobId1, cache);
+			assertEquals(1, checkFilesExist(jobId2, keys2, cache, true));
+			checkFileCountForJob(1, jobId2, server);
+			checkFileCountForJob(1, jobId2, cache);
+
+			libCache.unregisterJob(jobId2);
+
+			assertEquals(0, libCache.getNumberOfManagedJobs());
+			assertEquals(0, libCache.getNumberOfReferenceHolders(jobId1));
+			assertEquals(0, libCache.getNumberOfReferenceHolders(jobId2));
+			assertEquals(2, checkFilesExist(jobId1, keys1, cache, true));
+			checkFileCountForJob(2, jobId1, server);
+			checkFileCountForJob(2, jobId1, cache);
+			assertEquals(1, checkFilesExist(jobId2, keys2, cache, true));
+			checkFileCountForJob(1, jobId2, server);
+			checkFileCountForJob(1, jobId2, cache);
+
+			// only BlobCache#releaseJob() calls clean up files (tested in BlobCacheCleanupTest etc.
 		}
 		finally {
-			if (server != null) {
-				server.close();
+			if (libCache != null) {
+				libCache.shutdown();
 			}
 
-			if (libraryCacheManager != null) {
-				try {
-					libraryCacheManager.shutdown();
-				}
-				catch (IOException e) {
-					e.printStackTrace();
-				}
+			// should have been closed by the libraryCacheManager, but just in case
+			if (cache != null) {
+				cache.close();
+			}
+
+			if (server != null) {
+				server.close();
 			}
 		}
 	}
 
 	/**
-	 * Checks how many of the files given by blob keys are accessible.
-	 *
-	 * @param keys
-	 * 		blob keys to check
-	 * @param blobService
-	 * 		BLOB store to use
-	 * @param doThrow
-	 * 		whether exceptions should be ignored (<tt>false</tt>), or throws (<tt>true</tt>)
-	 *
-	 * @return number of files we were able to retrieve via {@link BlobService#getFile}
+	 * Tests that the {@link BlobLibraryCacheManager} cleans up after calling {@link
+	 * BlobLibraryCacheManager#unregisterTask(JobID, ExecutionAttemptID)}.
 	 */
-	private static int checkFilesExist(
-			JobID jobId, List<BlobKey> keys, BlobService blobService, boolean doThrow)
-			throws IOException {
-		int numFiles = 0;
+	@Test
+	public void testLibraryCacheManagerTaskCleanup() throws IOException, InterruptedException {
+
+		JobID jobId = new JobID();
+		ExecutionAttemptID attempt1 = new ExecutionAttemptID();
+		ExecutionAttemptID attempt2 = new ExecutionAttemptID();
+		List<BlobKey> keys = new ArrayList<>();
+		BlobServer server = null;
+		BlobCache cache = null;
+		BlobLibraryCacheManager libCache = null;
+
+		final byte[] buf = new byte[128];
+
+		try {
+			Configuration config = new Configuration();
+			config.setString(BlobServerOptions.STORAGE_DIRECTORY,
+				temporaryFolder.newFolder().getAbsolutePath());
+			config.setLong(BlobServerOptions.CLEANUP_INTERVAL, 1L);
+
+			server = new BlobServer(config, new VoidBlobStore());
+			InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
+			BlobClient bc = new BlobClient(serverAddress, config);
+			cache = new BlobCache(serverAddress, config, new VoidBlobStore());
+
+			keys.add(bc.put(jobId, buf));
+			buf[0] += 1;
+			keys.add(bc.put(jobId, buf));
+
+			bc.close();
+
+			libCache = new BlobLibraryCacheManager(cache);
+			cache.registerJob(jobId);
+
+			assertEquals(0, libCache.getNumberOfManagedJobs());
+			assertEquals(0, libCache.getNumberOfReferenceHolders(jobId));
+			checkFileCountForJob(2, jobId, server);
+			checkFileCountForJob(0, jobId, cache);
+
+			libCache.registerTask(jobId, attempt1, keys, Collections.<URL>emptyList());
+			ClassLoader classLoader1 = libCache.getClassLoader(jobId);
+
+			assertEquals(1, libCache.getNumberOfManagedJobs());
+			assertEquals(1, libCache.getNumberOfReferenceHolders(jobId));
+			assertEquals(2, checkFilesExist(jobId, keys, cache, true));
+			checkFileCountForJob(2, jobId, server);
+			checkFileCountForJob(2, jobId, cache);
+
+			libCache.registerTask(jobId, attempt2, keys, Collections.<URL>emptyList());
+			ClassLoader classLoader2 = libCache.getClassLoader(jobId);
+			assertEquals(classLoader1, classLoader2);
 
-		for (BlobKey key : keys) {
 			try {
-				if (jobId == null) {
-					blobService.getFile(key);
-				} else {
-					blobService.getFile(jobId, key);
-				}
-				++numFiles;
-			} catch (IOException e) {
-				if (doThrow) {
-					throw e;
-				}
+				libCache.registerTask(
+					jobId, new ExecutionAttemptID(), Collections.<BlobKey>emptyList(),
+					Collections.<URL>emptyList());
+				fail("Should fail with an IllegalStateException");
+			}
+			catch (IllegalStateException e) {
+				// that's what we want
+			}
+
+			try {
+				libCache.registerTask(
+					jobId, new ExecutionAttemptID(), keys,
+					Collections.singletonList(new URL("file:///tmp/does-not-exist")));
+				fail("Should fail with an IllegalStateException");
+			}
+			catch (IllegalStateException e) {
+				// that's what we want
 			}
+
+			assertEquals(1, libCache.getNumberOfManagedJobs());
+			assertEquals(2, libCache.getNumberOfReferenceHolders(jobId));
+			assertEquals(2, checkFilesExist(jobId, keys, cache, true));
+			checkFileCountForJob(2, jobId, server);
+			checkFileCountForJob(2, jobId, cache);
+
+			libCache.unregisterTask(jobId, attempt1);
+
+			assertEquals(1, libCache.getNumberOfManagedJobs());
+			assertEquals(1, libCache.getNumberOfReferenceHolders(jobId));
+			assertEquals(2, checkFilesExist(jobId, keys, cache, true));
+			checkFileCountForJob(2, jobId, server);
+			checkFileCountForJob(2, jobId, cache);
+
+			libCache.unregisterTask(jobId, attempt2);
+
+			assertEquals(0, libCache.getNumberOfManagedJobs());
+			assertEquals(0, libCache.getNumberOfReferenceHolders(jobId));
+			assertEquals(2, checkFilesExist(jobId, keys, cache, true));
+			checkFileCountForJob(2, jobId, server);
+			checkFileCountForJob(2, jobId, cache);
+
+			// only BlobCache#releaseJob() calls clean up files (tested in BlobCacheCleanupTest etc.
 		}
+		finally {
+			if (libCache != null) {
+				libCache.shutdown();
+			}
+
+			// should have been closed by the libraryCacheManager, but just in case
+			if (cache != null) {
+				cache.close();
+			}
 
-		return numFiles;
+			if (server != null) {
+				server.close();
+			}
+		}
 	}
 
 	/**
@@ -191,14 +313,14 @@ public class BlobLibraryCacheManagerTest {
 	 * BlobLibraryCacheManager#unregisterTask(JobID, ExecutionAttemptID)}.
 	 */
 	@Test
-	public void testLibraryCacheManagerTaskCleanup() throws IOException, InterruptedException {
+	public void testLibraryCacheManagerMixedJobTaskCleanup() throws IOException, InterruptedException {
 
-		JobID jid = new JobID();
-		ExecutionAttemptID executionId1 = new ExecutionAttemptID();
-		ExecutionAttemptID executionId2 = new ExecutionAttemptID();
-		List<BlobKey> keys = new ArrayList<BlobKey>();
+		JobID jobId = new JobID();
+		ExecutionAttemptID attempt1 = new ExecutionAttemptID();
+		List<BlobKey> keys = new ArrayList<>();
 		BlobServer server = null;
-		BlobLibraryCacheManager libraryCacheManager = null;
+		BlobCache cache = null;
+		BlobLibraryCacheManager libCache = null;
 
 		final byte[] buf = new byte[128];
 
@@ -206,67 +328,96 @@ public class BlobLibraryCacheManagerTest {
 			Configuration config = new Configuration();
 			config.setString(BlobServerOptions.STORAGE_DIRECTORY,
 				temporaryFolder.newFolder().getAbsolutePath());
+			config.setLong(BlobServerOptions.CLEANUP_INTERVAL, 1L);
 
 			server = new BlobServer(config, new VoidBlobStore());
-			InetSocketAddress blobSocketAddress = new InetSocketAddress(server.getPort());
-			BlobClient bc = new BlobClient(blobSocketAddress, config);
-
-			// TODO: make use of job-related BLOBs after adapting the BlobLibraryCacheManager
-//			JobID jobId = new JobID();
-			JobID jobId = null;
+			InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
+			BlobClient bc = new BlobClient(serverAddress, config);
+			cache = new BlobCache(serverAddress, config, new VoidBlobStore());
 
 			keys.add(bc.put(jobId, buf));
 			buf[0] += 1;
 			keys.add(bc.put(jobId, buf));
 
-			long cleanupInterval = 1000L;
-			libraryCacheManager = new BlobLibraryCacheManager(server, cleanupInterval);
-			libraryCacheManager.registerTask(jid, executionId1, keys, Collections.<URL>emptyList());
-			libraryCacheManager.registerTask(jid, executionId2, keys, Collections.<URL>emptyList());
+			bc.close();
 
-			assertEquals(2, checkFilesExist(jobId, keys, server, true));
-			assertEquals(2, libraryCacheManager.getNumberOfCachedLibraries());
-			assertEquals(2, libraryCacheManager.getNumberOfReferenceHolders(jid));
+			libCache = new BlobLibraryCacheManager(cache);
+			cache.registerJob(jobId);
 
-			libraryCacheManager.unregisterTask(jid, executionId1);
+			assertEquals(0, libCache.getNumberOfManagedJobs());
+			assertEquals(0, libCache.getNumberOfReferenceHolders(jobId));
+			checkFileCountForJob(2, jobId, server);
+			checkFileCountForJob(0, jobId, cache);
 
-			assertEquals(2, checkFilesExist(jobId, keys, server, true));
-			assertEquals(2, libraryCacheManager.getNumberOfCachedLibraries());
-			assertEquals(1, libraryCacheManager.getNumberOfReferenceHolders(jid));
+			libCache.registerJob(jobId, keys, Collections.<URL>emptyList());
+			ClassLoader classLoader1 = libCache.getClassLoader(jobId);
 
-			libraryCacheManager.unregisterTask(jid, executionId2);
+			assertEquals(1, libCache.getNumberOfManagedJobs());
+			assertEquals(1, libCache.getNumberOfReferenceHolders(jobId));
+			assertEquals(2, checkFilesExist(jobId, keys, cache, true));
+			checkFileCountForJob(2, jobId, server);
+			checkFileCountForJob(2, jobId, cache);
 
-			// because we cannot guarantee that there are not thread races in the build system, we
-			// loop for a certain while until the references disappear
-			{
-				long deadline = System.currentTimeMillis() + 30000;
-				do {
-					Thread.sleep(100);
-				}
-				while (libraryCacheManager.getNumberOfCachedLibraries() > 0 &&
-						System.currentTimeMillis() < deadline);
+			libCache.registerTask(jobId, attempt1, keys, Collections.<URL>emptyList());
+			ClassLoader classLoader2 = libCache.getClassLoader(jobId);
+			assertEquals(classLoader1, classLoader2);
+
+			try {
+				libCache.registerTask(
+					jobId, new ExecutionAttemptID(), Collections.<BlobKey>emptyList(),
+					Collections.<URL>emptyList());
+				fail("Should fail with an IllegalStateException");
+			}
+			catch (IllegalStateException e) {
+				// that's what we want
 			}
 
-			// this fails if we exited via a timeout
-			assertEquals(0, libraryCacheManager.getNumberOfCachedLibraries());
-			assertEquals(0, libraryCacheManager.getNumberOfReferenceHolders(jid));
+			try {
+				libCache.registerTask(
+					jobId, new ExecutionAttemptID(), keys,
+					Collections.singletonList(new URL("file:///tmp/does-not-exist")));
+				fail("Should fail with an IllegalStateException");
+			}
+			catch (IllegalStateException e) {
+				// that's what we want
+			}
 
-			// the blob cache should no longer contain the files
-			assertEquals(0, checkFilesExist(jobId, keys, server, false));
+			assertEquals(1, libCache.getNumberOfManagedJobs());
+			assertEquals(2, libCache.getNumberOfReferenceHolders(jobId));
+			assertEquals(2, checkFilesExist(jobId, keys, cache, true));
+			checkFileCountForJob(2, jobId, server);
+			checkFileCountForJob(2, jobId, cache);
 
-			bc.close();
-		} finally {
-			if (server != null) {
-				server.close();
+			libCache.unregisterJob(jobId);
+
+			assertEquals(1, libCache.getNumberOfManagedJobs());
+			assertEquals(1, libCache.getNumberOfReferenceHolders(jobId));
+			assertEquals(2, checkFilesExist(jobId, keys, cache, true));
+			checkFileCountForJob(2, jobId, server);
+			checkFileCountForJob(2, jobId, cache);
+
+			libCache.unregisterTask(jobId, attempt1);
+
+			assertEquals(0, libCache.getNumberOfManagedJobs());
+			assertEquals(0, libCache.getNumberOfReferenceHolders(jobId));
+			assertEquals(2, checkFilesExist(jobId, keys, cache, true));
+			checkFileCountForJob(2, jobId, server);
+			checkFileCountForJob(2, jobId, cache);
+
+			// only BlobCache#releaseJob() calls clean up files (tested in BlobCacheCleanupTest etc.
+		}
+		finally {
+			if (libCache != null) {
+				libCache.shutdown();
 			}
 
-			if (libraryCacheManager != null) {
-				try {
-					libraryCacheManager.shutdown();
-				}
-				catch (IOException e) {
-					e.printStackTrace();
-				}
+			// should have been closed by the libraryCacheManager, but just in case
+			if (cache != null) {
+				cache.close();
+			}
+
+			if (server != null) {
+				server.close();
 			}
 		}
 	}
@@ -275,75 +426,103 @@ public class BlobLibraryCacheManagerTest {
 	public void testRegisterAndDownload() throws IOException {
 		assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't work on Windows.
 
+		JobID jobId = new JobID();
 		BlobServer server = null;
 		BlobCache cache = null;
+		BlobLibraryCacheManager libCache = null;
 		File cacheDir = null;
 		try {
 			// create the blob transfer services
 			Configuration config = new Configuration();
 			config.setString(BlobServerOptions.STORAGE_DIRECTORY,
 				temporaryFolder.newFolder().getAbsolutePath());
+			config.setLong(BlobServerOptions.CLEANUP_INTERVAL, 1_000_000L);
 
 			server = new BlobServer(config, new VoidBlobStore());
 			InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
 			cache = new BlobCache(serverAddress, config, new VoidBlobStore());
 
-			// TODO: make use of job-related BLOBs after adapting the BlobLibraryCacheManager
-			JobID jobId = null;
-
 			// upload some meaningless data to the server
 			BlobClient uploader = new BlobClient(serverAddress, config);
 			BlobKey dataKey1 = uploader.put(jobId, new byte[]{1, 2, 3, 4, 5, 6, 7, 8});
 			BlobKey dataKey2 = uploader.put(jobId, new byte[]{11, 12, 13, 14, 15, 16, 17, 18});
 			uploader.close();
 
-			BlobLibraryCacheManager libCache = new BlobLibraryCacheManager(cache, 1000000000L);
-
-			assertEquals(0, libCache.getNumberOfCachedLibraries());
+			libCache = new BlobLibraryCacheManager(cache);
+			assertEquals(0, libCache.getNumberOfManagedJobs());
+			checkFileCountForJob(2, jobId, server);
+			checkFileCountForJob(0, jobId, cache);
 
 			// first try to access a non-existing entry
+			assertEquals(0, libCache.getNumberOfReferenceHolders(new JobID()));
 			try {
 				libCache.getClassLoader(new JobID());
 				fail("Should fail with an IllegalStateException");
 			}
 			catch (IllegalStateException e) {
-				// that#s what we want
+				// that's what we want
 			}
 
-			// now register some BLOBs as libraries
+			// register some BLOBs as libraries
 			{
-				JobID jid = new JobID();
-				ExecutionAttemptID executionId = new ExecutionAttemptID();
 				Collection<BlobKey> keys = Collections.singleton(dataKey1);
 
-				libCache.registerTask(jid, executionId, keys, Collections.<URL>emptyList());
-				assertEquals(1, libCache.getNumberOfReferenceHolders(jid));
-				assertEquals(1, libCache.getNumberOfCachedLibraries());
-				assertNotNull(libCache.getClassLoader(jid));
-
-				// un-register them again
-				libCache.unregisterTask(jid, executionId);
+				cache.registerJob(jobId);
+				ExecutionAttemptID executionId = new ExecutionAttemptID();
+				libCache.registerTask(jobId, executionId, keys, Collections.<URL>emptyList());
+				ClassLoader classLoader1 = libCache.getClassLoader(jobId);
+				assertEquals(1, libCache.getNumberOfManagedJobs());
+				assertEquals(1, libCache.getNumberOfReferenceHolders(jobId));
+				assertEquals(1, checkFilesExist(jobId, keys, cache, true));
+				checkFileCountForJob(2, jobId, server);
+				checkFileCountForJob(1, jobId, cache);
+				assertNotNull(libCache.getClassLoader(jobId));
+
+				libCache.registerJob(jobId, keys, Collections.<URL>emptyList());
+				ClassLoader classLoader2 = libCache.getClassLoader(jobId);
+				assertEquals(classLoader1, classLoader2);
+				assertEquals(1, libCache.getNumberOfManagedJobs());
+				assertEquals(2, libCache.getNumberOfReferenceHolders(jobId));
+				assertEquals(1, checkFilesExist(jobId, keys, cache, true));
+				checkFileCountForJob(2, jobId, server);
+				checkFileCountForJob(1, jobId, cache);
+				assertNotNull(libCache.getClassLoader(jobId));
+
+				// un-register the job
+				libCache.unregisterJob(jobId);
+				// still one task
+				assertEquals(1, libCache.getNumberOfManagedJobs());
+				assertEquals(1, libCache.getNumberOfReferenceHolders(jobId));
+				assertEquals(1, checkFilesExist(jobId, keys, cache, true));
+				checkFileCountForJob(2, jobId, server);
+				checkFileCountForJob(1, jobId, cache);
+
+				// unregister the task registration
+				libCache.unregisterTask(jobId, executionId);
+				assertEquals(0, libCache.getNumberOfManagedJobs());
+				assertEquals(0, libCache.getNumberOfReferenceHolders(jobId));
+				// changing the libCache registration does not influence the BLOB stores...
+				checkFileCountForJob(2, jobId, server);
+				checkFileCountForJob(1, jobId, cache);
 
 				// Don't fail if called again
-				libCache.unregisterTask(jid, executionId);
+				libCache.unregisterJob(jobId);
+				assertEquals(0, libCache.getNumberOfManagedJobs());
+				assertEquals(0, libCache.getNumberOfReferenceHolders(jobId));
 
-				assertEquals(0, libCache.getNumberOfReferenceHolders(jid));
+				libCache.unregisterTask(jobId, executionId);
+				assertEquals(0, libCache.getNumberOfManagedJobs());
+				assertEquals(0, libCache.getNumberOfReferenceHolders(jobId));
 
-				// library is still cached (but not associated with job any more)
-				assertEquals(1, libCache.getNumberOfCachedLibraries());
+				cache.releaseJob(jobId);
 
-				// should not be able to access the classloader any more
-				try {
-					libCache.getClassLoader(jid);
-					fail("Should fail with an IllegalStateException");
-				}
-				catch (IllegalStateException e) {
-					// that's what we want
-				}
+				// library is still cached (but not associated with job any more)
+				checkFileCountForJob(2, jobId, server);
+				checkFileCountForJob(1, jobId, cache);
 			}
 
 			// see BlobUtils for the directory layout
-			cacheDir = new File(cache.getStorageDir(), "no_job");
+			cacheDir = cache.getStorageLocation(jobId, new BlobKey()).getParentFile();
 			assertTrue(cacheDir.exists());
 
 			// make sure no further blobs can be downloaded by removing the write
@@ -352,12 +531,14 @@ public class BlobLibraryCacheManagerTest {
 
 			// since we cannot download this library any more, this call should fail
 			try {
-				libCache.registerTask(new JobID(), new ExecutionAttemptID(), Collections.singleton(dataKey2),
-						Collections.<URL>emptyList());
+				cache.registerJob(jobId);
+				libCache.registerTask(jobId, new ExecutionAttemptID(), Collections.singleton(dataKey2),
+					Collections.<URL>emptyList());
 				fail("This should fail with an IOException");
 			}
 			catch (IOException e) {
 				// splendid!
+				cache.releaseJob(jobId);
 			}
 		} finally {
 			if (cacheDir != null) {
@@ -368,6 +549,9 @@ public class BlobLibraryCacheManagerTest {
 			if (cache != null) {
 				cache.close();
 			}
+			if (libCache != null) {
+				libCache.shutdown();
+			}
 			if (server != null) {
 				server.close();
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
index 2f6738d..e52310e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.blob.BlobUtils;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -49,6 +50,9 @@ import java.util.Random;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 
+/**
+ * Integration test for {@link BlobLibraryCacheManager}.
+ */
 public class BlobLibraryCacheRecoveryITCase extends TestLogger {
 
 	@Rule
@@ -65,7 +69,6 @@ public class BlobLibraryCacheRecoveryITCase extends TestLogger {
 		InetSocketAddress[] serverAddress = new InetSocketAddress[2];
 		BlobLibraryCacheManager[] libServer = new BlobLibraryCacheManager[2];
 		BlobCache cache = null;
-		BlobLibraryCacheManager libCache = null;
 		BlobStoreService blobStoreService = null;
 
 		Configuration config = new Configuration();
@@ -75,6 +78,7 @@ public class BlobLibraryCacheRecoveryITCase extends TestLogger {
 			temporaryFolder.newFolder().getAbsolutePath());
 		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
 			temporaryFolder.newFolder().getAbsolutePath());
+		config.setLong(BlobServerOptions.CLEANUP_INTERVAL, 3_600L);
 
 		try {
 			blobStoreService = BlobUtils.createBlobStoreFromConfig(config);
@@ -82,7 +86,7 @@ public class BlobLibraryCacheRecoveryITCase extends TestLogger {
 			for (int i = 0; i < server.length; i++) {
 				server[i] = new BlobServer(config, blobStoreService);
 				serverAddress[i] = new InetSocketAddress("localhost", server[i].getPort());
-				libServer[i] = new BlobLibraryCacheManager(server[i], 3600 * 1000);
+				libServer[i] = new BlobLibraryCacheManager(server[i]);
 			}
 
 			// Random data
@@ -92,25 +96,22 @@ public class BlobLibraryCacheRecoveryITCase extends TestLogger {
 			List<BlobKey> keys = new ArrayList<>(2);
 
 			JobID jobId = new JobID();
-			// TODO: replace+adapt by jobId after adapting the BlobLibraryCacheManager
-			JobID blobJobId = null;
 
 			// Upload some data (libraries)
 			try (BlobClient client = new BlobClient(serverAddress[0], config)) {
-				keys.add(client.put(blobJobId, expected)); // Request 1
-				keys.add(client.put(blobJobId, expected, 32, 256)); // Request 2
+				keys.add(client.put(jobId, expected)); // Request 1
+				keys.add(client.put(jobId, expected, 32, 256)); // Request 2
 			}
 
 			// The cache
 			cache = new BlobCache(serverAddress[0], config, blobStoreService);
-			libCache = new BlobLibraryCacheManager(cache, 3600 * 1000);
 
 			// Register uploaded libraries
 			ExecutionAttemptID executionId = new ExecutionAttemptID();
 			libServer[0].registerTask(jobId, executionId, keys, Collections.<URL>emptyList());
 
 			// Verify key 1
-			File f = cache.getFile(keys.get(0));
+			File f = cache.getFile(jobId, keys.get(0));
 			assertEquals(expected.length, f.length());
 
 			try (FileInputStream fis = new FileInputStream(f)) {
@@ -123,13 +124,11 @@ public class BlobLibraryCacheRecoveryITCase extends TestLogger {
 
 			// Shutdown cache and start with other server
 			cache.close();
-			libCache.shutdown();
 
 			cache = new BlobCache(serverAddress[1], config, blobStoreService);
-			libCache = new BlobLibraryCacheManager(cache, 3600 * 1000);
 
 			// Verify key 1
-			f = cache.getFile(keys.get(0));
+			f = cache.getFile(jobId, keys.get(0));
 			assertEquals(expected.length, f.length());
 
 			try (FileInputStream fis = new FileInputStream(f)) {
@@ -141,7 +140,7 @@ public class BlobLibraryCacheRecoveryITCase extends TestLogger {
 			}
 
 			// Verify key 2
-			f = cache.getFile(keys.get(1));
+			f = cache.getFile(jobId, keys.get(1));
 			assertEquals(256, f.length());
 
 			try (FileInputStream fis = new FileInputStream(f)) {
@@ -154,8 +153,8 @@ public class BlobLibraryCacheRecoveryITCase extends TestLogger {
 
 			// Remove blobs again
 			try (BlobClient client = new BlobClient(serverAddress[1], config)) {
-				client.delete(keys.get(0));
-				client.delete(keys.get(1));
+				client.delete(jobId, keys.get(0));
+				client.delete(jobId, keys.get(1));
 			}
 
 			// Verify everything is clean below recoveryDir/<cluster_id>
@@ -167,6 +166,11 @@ public class BlobLibraryCacheRecoveryITCase extends TestLogger {
 			assertEquals("Unclean state backend: " + Arrays.toString(recoveryFiles), 0, recoveryFiles.length);
 		}
 		finally {
+			for (BlobLibraryCacheManager s : libServer) {
+				if (s != null) {
+					s.shutdown();
+				}
+			}
 			for (BlobServer s : server) {
 				if (s != null) {
 					s.close();
@@ -177,10 +181,6 @@ public class BlobLibraryCacheRecoveryITCase extends TestLogger {
 				cache.close();
 			}
 
-			if (libCache != null) {
-				libCache.shutdown();
-			}
-
 			if (blobStoreService != null) {
 				blobStoreService.closeAndCleanupAllData();
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerCleanupITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerCleanupITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerCleanupITCase.java
new file mode 100644
index 0000000..b2b455b
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerCleanupITCase.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.FailingBlockingInvokable;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+
+import static org.apache.flink.runtime.testingUtils.TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+/**
+ * Small test to check that the {@link org.apache.flink.runtime.blob.BlobServer} cleanup is executed
+ * after job termination.
+ */
+public class JobManagerCleanupITCase extends TestLogger {
+
+	@Rule
+	public TemporaryFolder tmpFolder = new TemporaryFolder();
+
+	private static ActorSystem system;
+
+	@BeforeClass
+	public static void setup() {
+		system = AkkaUtils.createLocalActorSystem(new Configuration());
+	}
+
+	@AfterClass
+	public static void teardown() {
+		JavaTestKit.shutdownActorSystem(system);
+	}
+
+	/**
+	 * Specifies which test case to run in {@link #testBlobServerCleanup(TestCase)}.
+	 */
+	private enum TestCase {
+		JOB_FINISHES_SUCESSFULLY,
+		JOB_IS_CANCELLED,
+		JOB_FAILS,
+		JOB_SUBMISSION_FAILS
+	}
+
+	/**
+	 * Test cleanup for a job that finishes ordinarily.
+	 */
+	@Test
+	public void testBlobServerCleanupFinishedJob() throws IOException {
+		testBlobServerCleanup(TestCase.JOB_FINISHES_SUCESSFULLY);
+	}
+
+	/**
+	 * Test cleanup for a job which is cancelled after submission.
+	 */
+	@Test
+	public void testBlobServerCleanupCancelledJob() throws IOException {
+		testBlobServerCleanup(TestCase.JOB_IS_CANCELLED);
+	}
+
+	/**
+	 * Test cleanup for a job that fails (first a task fails, then the job recovers, then the whole
+	 * job fails due to a limited restart policy).
+	 */
+	@Test
+	public void testBlobServerCleanupFailedJob() throws IOException {
+		testBlobServerCleanup(TestCase.JOB_FAILS);
+	}
+
+	/**
+	 * Test cleanup for a job that fails job submission (emulated by an additional BLOB not being
+	 * present).
+	 */
+	@Test
+	public void testBlobServerCleanupFailedSubmission() throws IOException {
+		testBlobServerCleanup(TestCase.JOB_SUBMISSION_FAILS);
+	}
+
+	private void testBlobServerCleanup(final TestCase testCase) throws IOException {
+		final int num_tasks = 2;
+		final File blobBaseDir = tmpFolder.newFolder();
+
+		new JavaTestKit(system) {{
+			new Within(duration("30 seconds")) {
+				@Override
+				protected void run() {
+					// Setup
+
+					TestingCluster cluster = null;
+					BlobClient bc = null;
+
+					try {
+						Configuration config = new Configuration();
+						config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
+						config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
+						config.setString(AkkaOptions.ASK_TIMEOUT, DEFAULT_AKKA_ASK_TIMEOUT());
+						config.setString(BlobServerOptions.STORAGE_DIRECTORY, blobBaseDir.getAbsolutePath());
+
+						config.setString(ConfigConstants.RESTART_STRATEGY, "fixeddelay");
+						config.setInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1);
+						config.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "1 s");
+						// BLOBs are deleted from BlobCache between 1s and 2s after last reference
+						// -> the BlobCache may still have the BLOB or not (let's test both cases randomly)
+						config.setLong(BlobServerOptions.CLEANUP_INTERVAL, 1L);
+
+						cluster = new TestingCluster(config);
+						cluster.start();
+
+						final ActorGateway jobManagerGateway = cluster.getLeaderGateway(
+							TestingUtils.TESTING_DURATION());
+
+						// we can set the leader session ID to None because we don't use this gateway to send messages
+						final ActorGateway testActorGateway = new AkkaActorGateway(getTestActor(),
+							HighAvailabilityServices.DEFAULT_LEADER_ID);
+
+						// Create a task
+
+						JobVertex source = new JobVertex("Source");
+						if (testCase == TestCase.JOB_FAILS || testCase == TestCase.JOB_IS_CANCELLED) {
+							source.setInvokableClass(FailingBlockingInvokable.class);
+						} else {
+							source.setInvokableClass(NoOpInvokable.class);
+						}
+						source.setParallelism(num_tasks);
+
+						JobGraph jobGraph = new JobGraph("BlobCleanupTest", source);
+						final JobID jid = jobGraph.getJobID();
+
+						// request the blob port from the job manager
+						Future<Object> future = jobManagerGateway
+							.ask(JobManagerMessages.getRequestBlobManagerPort(), remaining());
+						int blobPort = (Integer) Await.result(future, remaining());
+
+						// upload a blob
+						BlobKey key1;
+						bc = new BlobClient(new InetSocketAddress("localhost", blobPort),
+							config);
+						try {
+							key1 = bc.put(jid, new byte[10]);
+						} finally {
+							bc.close();
+						}
+						jobGraph.addBlob(key1);
+
+						if (testCase == TestCase.JOB_SUBMISSION_FAILS) {
+							// add an invalid key so that the submission fails
+							jobGraph.addBlob(new BlobKey());
+						}
+
+						// Submit the job and wait for all vertices to be running
+						jobManagerGateway.tell(
+							new JobManagerMessages.SubmitJob(
+								jobGraph,
+								ListeningBehaviour.EXECUTION_RESULT),
+							testActorGateway);
+						if (testCase == TestCase.JOB_SUBMISSION_FAILS) {
+							expectMsgClass(JobManagerMessages.JobResultFailure.class);
+						} else {
+							expectMsgClass(JobManagerMessages.JobSubmitSuccess.class);
+
+							if (testCase == TestCase.JOB_FAILS) {
+								// fail a task so that the job is going to be recovered (we actually do not
+								// need the blocking part of the invokable and can start throwing right away)
+								FailingBlockingInvokable.unblock();
+
+								// job will get restarted, BlobCache may re-download the BLOB if already deleted
+								// then the tasks will fail again and the restart strategy will finalise the job
+
+								expectMsgClass(JobManagerMessages.JobResultFailure.class);
+							} else if (testCase == TestCase.JOB_IS_CANCELLED) {
+								jobManagerGateway.tell(
+									new JobManagerMessages.CancelJob(jid),
+									testActorGateway);
+								expectMsgClass(JobManagerMessages.CancellationResponse.class);
+
+								// job will be cancelled and everything should be cleaned up
+
+								expectMsgClass(JobManagerMessages.JobResultFailure.class);
+							} else {
+								expectMsgClass(JobManagerMessages.JobResultSuccess.class);
+							}
+						}
+
+						// both BlobServer and BlobCache should eventually delete all files
+
+						File[] blobDirs = blobBaseDir.listFiles(new FilenameFilter() {
+							@Override
+							public boolean accept(File dir, String name) {
+								return name.startsWith("blobStore-");
+							}
+						});
+						assertNotNull(blobDirs);
+						for (File blobDir : blobDirs) {
+							waitForEmptyBlobDir(blobDir, remaining());
+						}
+
+					} catch (Exception e) {
+						e.printStackTrace();
+						fail(e.getMessage());
+					} finally {
+						if (bc != null) {
+							try {
+								bc.close();
+							} catch (IOException ignored) {
+							}
+						}
+						if (cluster != null) {
+							cluster.shutdown();
+						}
+					}
+				}
+			};
+		}};
+
+		// after everything has been shut down, the storage directory itself should be empty
+		assertArrayEquals(new File[] {}, blobBaseDir.listFiles());
+	}
+
+	/**
+	 * Waits until the given {@link org.apache.flink.runtime.blob.BlobService} storage directory
+	 * does not contain any job-related folders any more.
+	 *
+	 * @param blobDir
+	 * 		directory of a {@link org.apache.flink.runtime.blob.BlobServer} or {@link
+	 * 		org.apache.flink.runtime.blob.BlobCache}
+	 * @param remaining
+	 * 		remaining time for this test
+	 *
+	 * @see org.apache.flink.runtime.blob.BlobUtils
+	 */
+	private static void waitForEmptyBlobDir(File blobDir, FiniteDuration remaining)
+		throws InterruptedException {
+		long deadline = System.currentTimeMillis() + remaining.toMillis();
+		String[] blobDirContents;
+		do {
+			blobDirContents = blobDir.list(new FilenameFilter() {
+				@Override
+				public boolean accept(File dir, String name) {
+					return name.startsWith("job_");
+				}
+			});
+			if (blobDirContents == null || blobDirContents.length == 0) {
+				return;
+			}
+			Thread.sleep(100);
+		} while (System.currentTimeMillis() < deadline);
+
+		fail("Timeout while waiting for " + blobDir.getAbsolutePath() + " to become empty. Current contents: " + Arrays.toString(blobDirContents));
+	}
+}