You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/08/14 09:18:07 UTC

[1/3] flink git commit: [FLINK-7055][blob] refactor getURL() to the more generic getFile()

Repository: flink
Updated Branches:
  refs/heads/master f78eb0f8b -> 0a19c456a


[FLINK-7055][blob] refactor getURL() to the more generic getFile()

The fact that we always returned URL objects is a relic of the BlobServer's only
use for URLClassLoader. Since we'd like to extend its use, returning File
objects instead is more generic.

This closes #4236.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b7c1dfaa
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b7c1dfaa
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b7c1dfaa

Branch: refs/heads/master
Commit: b7c1dfaa3175036287694520fc9bb1649707ef7d
Parents: f78eb0f
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Wed Jun 21 16:14:15 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Aug 14 08:37:33 2017 +0200

----------------------------------------------------------------------
 .../handlers/TaskManagerLogHandler.java         |  2 +-
 .../apache/flink/runtime/blob/BlobCache.java    | 17 ++++++------
 .../apache/flink/runtime/blob/BlobServer.java   | 18 ++++++-------
 .../apache/flink/runtime/blob/BlobService.java  |  8 +++---
 .../apache/flink/runtime/client/JobClient.java  |  2 +-
 .../librarycache/BlobLibraryCacheManager.java   |  2 +-
 .../runtime/blob/BlobCacheRetriesTest.java      |  4 +--
 .../runtime/blob/BlobCacheSuccessTest.java      | 27 ++++++--------------
 .../runtime/blob/BlobServerDeleteTest.java      |  4 +--
 .../BlobLibraryCacheManagerTest.java            |  8 +++---
 .../BlobLibraryCacheRecoveryITCase.java         |  6 ++---
 11 files changed, 44 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b7c1dfaa/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
index b7fee2d..f175573 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
@@ -208,7 +208,7 @@ public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase {
 								lastSubmittedFile.put(taskManagerID, blobKey);
 							}
 							try {
-								return blobCache.getURL(blobKey).getFile();
+								return blobCache.getFile(blobKey).getAbsolutePath();
 							} catch (IOException e) {
 								throw new FlinkFutureException("Could not retrieve blob for " + blobKey + '.', e);
 							}

http://git-wip-us.apache.org/repos/asf/flink/blob/b7c1dfaa/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
index 32bd8fd..3e19537 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
@@ -30,7 +30,6 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.InetSocketAddress;
-import java.net.URL;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -39,7 +38,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 /**
  * The BLOB cache implements a local cache for content-addressable BLOBs.
  *
- * <p>When requesting BLOBs through the {@link BlobCache#getURL} methods, the
+ * <p>When requesting BLOBs through the {@link BlobCache#getFile(BlobKey)} method, the
  * BLOB cache will first attempt to serve the file from its local cache. Only if
  * the local cache does not contain the desired BLOB, the BLOB cache will try to
  * download it from a distributed file system (if available) or the BLOB
@@ -111,21 +110,22 @@ public final class BlobCache implements BlobService {
 	}
 
 	/**
-	 * Returns the URL for the BLOB with the given key. The method will first attempt to serve
+	 * Returns local copy of the file for the BLOB with the given key. The method will first attempt to serve
 	 * the BLOB from its local cache. If the BLOB is not in the cache, the method will try to download it
 	 * from this cache's BLOB server.
 	 *
 	 * @param requiredBlob The key of the desired BLOB.
-	 * @return URL referring to the local storage location of the BLOB.
+	 * @return file referring to the local storage location of the BLOB.
 	 * @throws IOException Thrown if an I/O error occurs while downloading the BLOBs from the BLOB server.
 	 */
-	public URL getURL(final BlobKey requiredBlob) throws IOException {
+	@Override
+	public File getFile(final BlobKey requiredBlob) throws IOException {
 		checkArgument(requiredBlob != null, "BLOB key cannot be null.");
 
 		final File localJarFile = BlobUtils.getStorageLocation(storageDir, requiredBlob);
 
 		if (localJarFile.exists()) {
-			return localJarFile.toURI().toURL();
+			return localJarFile;
 		}
 
 		// first try the distributed blob store (if available)
@@ -136,7 +136,7 @@ public final class BlobCache implements BlobService {
 		}
 
 		if (localJarFile.exists()) {
-			return localJarFile.toURI().toURL();
+			return localJarFile;
 		}
 
 		// fallback: download from the BlobServer
@@ -160,7 +160,7 @@ public final class BlobCache implements BlobService {
 				}
 
 				// success, we finished
-				return localJarFile.toURI().toURL();
+				return localJarFile;
 			}
 			catch (Throwable t) {
 				String message = "Failed to fetch BLOB " + requiredBlob + " from " + serverAddress +
@@ -188,6 +188,7 @@ public final class BlobCache implements BlobService {
 	 * Deletes the file associated with the given key from the BLOB cache.
 	 * @param key referring to the file to be deleted
 	 */
+	@Override
 	public void delete(BlobKey key) throws IOException{
 		final File localFile = BlobUtils.getStorageLocation(storageDir, key);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b7c1dfaa/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index ecb4527..add9f7f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -34,7 +34,6 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
-import java.net.URL;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -334,22 +333,23 @@ public class BlobServer extends Thread implements BlobService {
 	}
 
 	/**
-	 * Method which retrieves the URL of a file associated with a blob key. The blob server looks
-	 * the blob key up in its local storage. If the file exists, then the URL is returned. If the
-	 * file does not exist, then a FileNotFoundException is thrown.
+	 * Method which retrieves the local path of a file associated with a blob key. The blob server
+	 * looks the blob key up in its local storage. If the file exists, it is returned. If the
+	 * file does not exist, it is retrieved from the HA blob store (if available) or a
+	 * FileNotFoundException is thrown.
 	 *
 	 * @param requiredBlob blob key associated with the requested file
-	 * @return URL of the file
-	 * @throws IOException
+	 * @return file referring to the local storage location of the BLOB.
+	 * @throws IOException Thrown if the file retrieval failed.
 	 */
 	@Override
-	public URL getURL(BlobKey requiredBlob) throws IOException {
+	public File getFile(BlobKey requiredBlob) throws IOException {
 		checkArgument(requiredBlob != null, "BLOB key cannot be null.");
 
 		final File localFile = BlobUtils.getStorageLocation(storageDir, requiredBlob);
 
 		if (localFile.exists()) {
-			return localFile.toURI().toURL();
+			return localFile;
 		}
 		else {
 			try {
@@ -361,7 +361,7 @@ public class BlobServer extends Thread implements BlobService {
 			}
 
 			if (localFile.exists()) {
-				return localFile.toURI().toURL();
+				return localFile;
 			}
 			else {
 				throw new FileNotFoundException("Local file " + localFile + " does not exist " +

http://git-wip-us.apache.org/repos/asf/flink/blob/b7c1dfaa/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
index c1447c8..1e56f26 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
@@ -19,8 +19,8 @@
 package org.apache.flink.runtime.blob;
 
 import java.io.Closeable;
+import java.io.File;
 import java.io.IOException;
-import java.net.URL;
 
 /**
  * A simple store and retrieve binary large objects (BLOBs).
@@ -28,14 +28,14 @@ import java.net.URL;
 public interface BlobService extends Closeable {
 
 	/**
-	 * Returns the URL of the file associated with the provided blob key.
+	 * Returns the path to a local copy of the file associated with the provided blob key.
 	 *
 	 * @param key blob key associated with the requested file
-	 * @return The URL to the file.
+	 * @return The path to the file.
 	 * @throws java.io.FileNotFoundException when the path does not exist;
 	 * @throws IOException if any other error occurs when retrieving the file
 	 */
-	URL getURL(BlobKey key) throws IOException;
+	File getFile(BlobKey key) throws IOException;
 
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/b7c1dfaa/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
index 19f0e2c..e3657ff 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
@@ -234,7 +234,7 @@ public class JobClient {
 			int pos = 0;
 			for (BlobKey blobKey : props.requiredJarFiles()) {
 				try {
-					allURLs[pos++] = blobClient.getURL(blobKey);
+					allURLs[pos++] = blobClient.getFile(blobKey).toURI().toURL();
 				} catch (Exception e) {
 					try {
 						blobClient.close();

http://git-wip-us.apache.org/repos/asf/flink/blob/b7c1dfaa/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
index 0387725..9aff6f9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
@@ -254,7 +254,7 @@ public final class BlobLibraryCacheManager extends TimerTask implements LibraryC
 		// it is important that we fetch the URL before increasing the counter.
 		// in case the URL cannot be created (failed to fetch the BLOB), we have no stale counter
 		try {
-			URL url = blobService.getURL(key);
+			URL url = blobService.getFile(key).toURI().toURL();
 
 			Integer references = blobKeyReferenceCounters.get(key);
 			int newReferences = references == null ? 1 : references + 1;

http://git-wip-us.apache.org/repos/asf/flink/blob/b7c1dfaa/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
index 366b592..fe763fa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
@@ -115,7 +115,7 @@ public class BlobCacheRetriesTest {
 			cache = new BlobCache(serverAddress, config, new VoidBlobStore());
 
 			// trigger a download - it should fail the first two times, but retry, and succeed eventually
-			URL url = cache.getURL(key);
+			URL url = cache.getFile(key).toURI().toURL();
 			InputStream is = url.openStream();
 			try {
 				byte[] received = new byte[data.length];
@@ -211,7 +211,7 @@ public class BlobCacheRetriesTest {
 
 			// trigger a download - it should fail eventually
 			try {
-				cache.getURL(key);
+				cache.getFile(key);
 				fail("This should fail");
 			}
 			catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/b7c1dfaa/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
index 51be1b0..d06f76f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
@@ -30,15 +30,12 @@ import org.junit.rules.TemporaryFolder;
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.net.URISyntaxException;
-import java.net.URL;
 import java.util.ArrayList;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 /**
  * This class contains unit tests for the {@link BlobCache}.
@@ -175,7 +172,7 @@ public class BlobCacheSuccessTest {
 			blobCache = new BlobCache(serverAddress, cacheConfig, blobStoreService);
 
 			for (BlobKey blobKey : blobKeys) {
-				blobCache.getURL(blobKey);
+				blobCache.getFile(blobKey);
 			}
 
 			if (blobServer != null) {
@@ -184,28 +181,20 @@ public class BlobCacheSuccessTest {
 				blobServer = null;
 			}
 
-			final URL[] urls = new URL[blobKeys.size()];
+			final File[] files = new File[blobKeys.size()];
 
 			for(int i = 0; i < blobKeys.size(); i++){
-				urls[i] = blobCache.getURL(blobKeys.get(i));
+				files[i] = blobCache.getFile(blobKeys.get(i));
 			}
 
 			// Verify the result
-			assertEquals(blobKeys.size(), urls.length);
+			assertEquals(blobKeys.size(), files.length);
 
-			for (final URL url : urls) {
+			for (final File file : files) {
+				assertNotNull(file);
 
-				assertNotNull(url);
-
-				try {
-					final File cachedFile = new File(url.toURI());
-
-					assertTrue(cachedFile.exists());
-					assertEquals(buf.length, cachedFile.length());
-
-				} catch (URISyntaxException e) {
-					fail(e.getMessage());
-				}
+				assertTrue(file.exists());
+				assertEquals(buf.length, file.length());
 			}
 		} finally {
 			if (blobServer != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/b7c1dfaa/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
index 5db9568..ce4574b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
@@ -109,7 +109,7 @@ public class BlobServerDeleteTest extends TestLogger {
 			// delete a file directly on the server
 			server.delete(key2);
 			try {
-				server.getURL(key2);
+				server.getFile(key2);
 				fail("BLOB should have been deleted");
 			}
 			catch (IOException e) {
@@ -209,7 +209,7 @@ public class BlobServerDeleteTest extends TestLogger {
 			server.delete(key);
 
 			// the file should still be there
-			server.getURL(key);
+			server.getFile(key);
 		} catch (Exception e) {
 			e.printStackTrace();
 			fail(e.getMessage());

http://git-wip-us.apache.org/repos/asf/flink/blob/b7c1dfaa/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
index 606d8c9..476fdcb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
@@ -108,13 +108,13 @@ public class BlobLibraryCacheManagerTest {
 			assertEquals(0, checkFilesExist(keys, server, false));
 
 			try {
-				server.getURL(keys.get(0));
+				server.getFile(keys.get(0));
 				fail("name-addressable BLOB should have been deleted");
 			} catch (IOException e) {
 				// expected
 			}
 			try {
-				server.getURL(keys.get(1));
+				server.getFile(keys.get(1));
 				fail("name-addressable BLOB should have been deleted");
 			} catch (IOException e) {
 				// expected
@@ -150,7 +150,7 @@ public class BlobLibraryCacheManagerTest {
 	 * @param doThrow
 	 * 		whether exceptions should be ignored (<tt>false</tt>), or throws (<tt>true</tt>)
 	 *
-	 * @return number of files we were able to retrieve via {@link BlobService#getURL(BlobKey)}
+	 * @return number of files we were able to retrieve via {@link BlobService#getFile(BlobKey)}
 	 */
 	private static int checkFilesExist(
 		List<BlobKey> keys, BlobService blobService, boolean doThrow)
@@ -159,7 +159,7 @@ public class BlobLibraryCacheManagerTest {
 
 		for (BlobKey key : keys) {
 			try {
-				blobService.getURL(key);
+				blobService.getFile(key);
 				++numFiles;
 			} catch (IOException e) {
 				if (doThrow) {

http://git-wip-us.apache.org/repos/asf/flink/blob/b7c1dfaa/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
index e5efd19..b19835b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
@@ -107,7 +107,7 @@ public class BlobLibraryCacheRecoveryITCase extends TestLogger {
 			libServer[0].registerTask(jobId, executionId, keys, Collections.<URL>emptyList());
 
 			// Verify key 1
-			File f = new File(cache.getURL(keys.get(0)).toURI());
+			File f = cache.getFile(keys.get(0));
 			assertEquals(expected.length, f.length());
 
 			try (FileInputStream fis = new FileInputStream(f)) {
@@ -126,7 +126,7 @@ public class BlobLibraryCacheRecoveryITCase extends TestLogger {
 			libCache = new BlobLibraryCacheManager(cache, 3600 * 1000);
 
 			// Verify key 1
-			f = new File(cache.getURL(keys.get(0)).toURI());
+			f = cache.getFile(keys.get(0));
 			assertEquals(expected.length, f.length());
 
 			try (FileInputStream fis = new FileInputStream(f)) {
@@ -138,7 +138,7 @@ public class BlobLibraryCacheRecoveryITCase extends TestLogger {
 			}
 
 			// Verify key 2
-			f = new File(cache.getURL(keys.get(1)).toURI());
+			f = cache.getFile(keys.get(1));
 			assertEquals(256, f.length());
 
 			try (FileInputStream fis = new FileInputStream(f)) {


[2/3] flink git commit: [FLINK-7056][blob] add API to allow job-related BLOBs to be stored

Posted by tr...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/0a19c456/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
index d06f76f..1216be2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
@@ -18,10 +18,12 @@
 
 package org.apache.flink.runtime.blob;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
 
 import org.junit.Rule;
 import org.junit.Test;
@@ -40,69 +42,123 @@ import static org.junit.Assert.assertTrue;
 /**
  * This class contains unit tests for the {@link BlobCache}.
  */
-public class BlobCacheSuccessTest {
+public class BlobCacheSuccessTest extends TestLogger {
 
 	@Rule
 	public TemporaryFolder temporaryFolder = new TemporaryFolder();
 
 	/**
-	 * BlobCache with no HA. BLOBs need to be downloaded form a working
+	 * BlobCache with no HA, job-unrelated BLOBs. BLOBs need to be downloaded form a working
 	 * BlobServer.
 	 */
 	@Test
-	public void testBlobCache() throws IOException {
+	public void testBlobNoJobCache() throws IOException {
 		Configuration config = new Configuration();
 		config.setString(BlobServerOptions.STORAGE_DIRECTORY,
 			temporaryFolder.newFolder().getAbsolutePath());
 
-		uploadFileGetTest(config, false, false);
+		uploadFileGetTest(config, null, false, false);
+	}
+
+	/**
+	 * BlobCache with no HA, job-related BLOBS. BLOBs need to be downloaded form a working
+	 * BlobServer.
+	 */
+	@Test
+	public void testBlobForJobCache() throws IOException {
+		Configuration config = new Configuration();
+		config.setString(BlobServerOptions.STORAGE_DIRECTORY,
+			temporaryFolder.newFolder().getAbsolutePath());
+
+		uploadFileGetTest(config, new JobID(), false, false);
 	}
 
 	/**
 	 * BlobCache is configured in HA mode and the cache can download files from
 	 * the file system directly and does not need to download BLOBs from the
-	 * BlobServer.
+	 * BlobServer. Using job-unrelated BLOBs.
+	 */
+	@Test
+	public void testBlobNoJobCacheHa() throws IOException {
+		testBlobCacheHa(null);
+	}
+
+	/**
+	 * BlobCache is configured in HA mode and the cache can download files from
+	 * the file system directly and does not need to download BLOBs from the
+	 * BlobServer. Using job-related BLOBs.
 	 */
 	@Test
-	public void testBlobCacheHa() throws IOException {
+	public void testBlobForJobCacheHa() throws IOException {
+		testBlobCacheHa(new JobID());
+	}
+
+	private void testBlobCacheHa(final JobID jobId) throws IOException {
 		Configuration config = new Configuration();
 		config.setString(BlobServerOptions.STORAGE_DIRECTORY,
 			temporaryFolder.newFolder().getAbsolutePath());
 		config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
 		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
 			temporaryFolder.newFolder().getPath());
-		uploadFileGetTest(config, true, true);
+		uploadFileGetTest(config, jobId, true, true);
 	}
 
 	/**
 	 * BlobCache is configured in HA mode and the cache can download files from
 	 * the file system directly and does not need to download BLOBs from the
-	 * BlobServer.
+	 * BlobServer. Using job-unrelated BLOBs.
+	 */
+	@Test
+	public void testBlobNoJobCacheHa2() throws IOException {
+		testBlobCacheHa2(null);
+	}
+
+	/**
+	 * BlobCache is configured in HA mode and the cache can download files from
+	 * the file system directly and does not need to download BLOBs from the
+	 * BlobServer. Using job-related BLOBs.
 	 */
 	@Test
-	public void testBlobCacheHa2() throws IOException {
+	public void testBlobForJobCacheHa2() throws IOException {
+		testBlobCacheHa2(new JobID());
+	}
+
+	private void testBlobCacheHa2(JobID jobId) throws IOException {
 		Configuration config = new Configuration();
 		config.setString(BlobServerOptions.STORAGE_DIRECTORY,
 			temporaryFolder.newFolder().getAbsolutePath());
 		config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
 		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
 			temporaryFolder.newFolder().getPath());
-		uploadFileGetTest(config, false, true);
+		uploadFileGetTest(config, jobId, false, true);
 	}
 
 	/**
 	 * BlobCache is configured in HA mode but the cache itself cannot access the
-	 * file system and thus needs to download BLOBs from the BlobServer.
+	 * file system and thus needs to download BLOBs from the BlobServer. Using job-unrelated BLOBs.
 	 */
 	@Test
-	public void testBlobCacheHaFallback() throws IOException {
+	public void testBlobNoJobCacheHaFallback() throws IOException {
+		testBlobCacheHaFallback(null);
+	}
+
+	/**
+	 * BlobCache is configured in HA mode but the cache itself cannot access the
+	 * file system and thus needs to download BLOBs from the BlobServer. Using job-related BLOBs.
+	 */
+	@Test
+	public void testBlobForJobCacheHaFallback() throws IOException {
+		testBlobCacheHaFallback(new JobID());
+	}
+
+	private void testBlobCacheHaFallback(final JobID jobId) throws IOException {
 		Configuration config = new Configuration();
 		config.setString(BlobServerOptions.STORAGE_DIRECTORY,
 			temporaryFolder.newFolder().getAbsolutePath());
 		config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
 		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
 			temporaryFolder.newFolder().getPath());
-		uploadFileGetTest(config, false, false);
+		uploadFileGetTest(config, jobId, false, false);
 	}
 
 	/**
@@ -119,7 +175,7 @@ public class BlobCacheSuccessTest {
 	 * 		whether the cache should have access to a shared <tt>HA_STORAGE_PATH</tt> (only useful with
 	 * 		HA mode)
 	 */
-	private void uploadFileGetTest(final Configuration config, boolean shutdownServerAfterUpload,
+	private void uploadFileGetTest(final Configuration config, JobID jobId, boolean shutdownServerAfterUpload,
 			boolean cacheHasAccessToFs) throws IOException {
 		Preconditions.checkArgument(!shutdownServerAfterUpload || cacheHasAccessToFs);
 
@@ -154,9 +210,9 @@ public class BlobCacheSuccessTest {
 
 				blobClient = new BlobClient(serverAddress, config);
 
-				blobKeys.add(blobClient.put(buf));
+				blobKeys.add(blobClient.put(jobId, buf));
 				buf[0] = 1; // Make sure the BLOB key changes
-				blobKeys.add(blobClient.put(buf));
+				blobKeys.add(blobClient.put(jobId, buf));
 			} finally {
 				if (blobClient != null) {
 					blobClient.close();
@@ -172,7 +228,11 @@ public class BlobCacheSuccessTest {
 			blobCache = new BlobCache(serverAddress, cacheConfig, blobStoreService);
 
 			for (BlobKey blobKey : blobKeys) {
-				blobCache.getFile(blobKey);
+				if (jobId == null) {
+					blobCache.getFile(blobKey);
+				} else {
+					blobCache.getFile(jobId, blobKey);
+				}
 			}
 
 			if (blobServer != null) {
@@ -184,7 +244,11 @@ public class BlobCacheSuccessTest {
 			final File[] files = new File[blobKeys.size()];
 
 			for(int i = 0; i < blobKeys.size(); i++){
-				files[i] = blobCache.getFile(blobKeys.get(i));
+				if (jobId == null) {
+					files[i] = blobCache.getFile(blobKeys.get(i));
+				} else {
+					files[i] = blobCache.getFile(jobId, blobKeys.get(i));
+				}
 			}
 
 			// Verify the result

http://git-wip-us.apache.org/repos/asf/flink/blob/0a19c456/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
index 2932f41..cfec4c5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
@@ -38,6 +38,7 @@ import java.security.MessageDigest;
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.flink.api.common.JobID;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
@@ -144,7 +145,7 @@ public class BlobClientTest {
 	 * @throws IOException
 	 *         thrown if an I/O error occurs while reading the input stream
 	 */
-	private static void validateGet(final InputStream inputStream, final byte[] buf) throws IOException {
+	static void validateGet(final InputStream inputStream, final byte[] buf) throws IOException {
 		byte[] receivedBuffer = new byte[buf.length];
 
 		int bytesReceived = 0;
@@ -220,13 +221,20 @@ public class BlobClientTest {
 			InetSocketAddress serverAddress = new InetSocketAddress("localhost", getBlobServer().getPort());
 			client = new BlobClient(serverAddress, getBlobClientConfig());
 
+			JobID jobId = new JobID();
+
 			// Store the data
-			BlobKey receivedKey = client.put(testBuffer);
+			BlobKey receivedKey = client.put(null, testBuffer);
+			assertEquals(origKey, receivedKey);
+			// try again with a job-related BLOB:
+			receivedKey = client.put(jobId, testBuffer);
 			assertEquals(origKey, receivedKey);
 
 			// Retrieve the data
 			InputStream is = client.get(receivedKey);
 			validateGet(is, testBuffer);
+			is = client.get(jobId, receivedKey);
+			validateGet(is, testBuffer);
 
 			// Check reaction to invalid keys
 			try {
@@ -236,6 +244,15 @@ public class BlobClientTest {
 			catch (IOException fnfe) {
 				// expected
 			}
+			// new client needed (closed from failure above)
+			client = new BlobClient(serverAddress, getBlobClientConfig());
+			try {
+				client.get(jobId, new BlobKey());
+				fail("Expected IOException did not occur");
+			}
+			catch (IOException fnfe) {
+				// expected
+			}
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -276,10 +293,16 @@ public class BlobClientTest {
 			InetSocketAddress serverAddress = new InetSocketAddress("localhost", getBlobServer().getPort());
 			client = new BlobClient(serverAddress, getBlobClientConfig());
 
+			JobID jobId = new JobID();
+
 			// Store the data
 			is = new FileInputStream(testFile);
 			BlobKey receivedKey = client.put(is);
 			assertEquals(origKey, receivedKey);
+			// try again with a job-related BLOB:
+			is = new FileInputStream(testFile);
+			receivedKey = client.put(jobId, is);
+			assertEquals(origKey, receivedKey);
 
 			is.close();
 			is = null;
@@ -287,6 +310,8 @@ public class BlobClientTest {
 			// Retrieve the data
 			is = client.get(receivedKey);
 			validateGet(is, testFile);
+			is = client.get(jobId, receivedKey);
+			validateGet(is, testFile);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -324,6 +349,13 @@ public class BlobClientTest {
 
 		InetSocketAddress serverAddress = new InetSocketAddress("localhost", blobServer.getPort());
 
+		uploadJarFile(serverAddress, blobClientConfig, testFile);
+		uploadJarFile(serverAddress, blobClientConfig, testFile);
+	}
+
+	private static void uploadJarFile(
+		final InetSocketAddress serverAddress, final Configuration blobClientConfig,
+		final File testFile) throws IOException {
 		List<BlobKey> blobKeys = BlobClient.uploadJarFiles(serverAddress, blobClientConfig,
 			Collections.singletonList(new Path(testFile.toURI())));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0a19c456/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
index 3c7711d..81304f4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
@@ -94,11 +94,17 @@ public class BlobRecoveryITCase extends TestLogger {
 
 			BlobKey[] keys = new BlobKey[2];
 
-			// Put data
-			keys[0] = client.put(expected); // Request 1
-			keys[1] = client.put(expected, 32, 256); // Request 2
+			// Put job-unrelated data
+			keys[0] = client.put(null, expected); // Request 1
+			keys[1] = client.put(null, expected, 32, 256); // Request 2
 
+			// Put job-related data, verify that the checksums match
 			JobID[] jobId = new JobID[] { new JobID(), new JobID() };
+			BlobKey key;
+			key = client.put(jobId[0], expected); // Request 3
+			assertEquals(keys[0], key);
+			key = client.put(jobId[1], expected, 32, 256); // Request 4
+			assertEquals(keys[1], key);
 
 			// check that the storage directory exists
 			final Path blobServerPath = new Path(storagePath, "blob");
@@ -130,9 +136,31 @@ public class BlobRecoveryITCase extends TestLogger {
 				}
 			}
 
+			// Verify request 3
+			try (InputStream is = client.get(jobId[0], keys[0])) {
+				byte[] actual = new byte[expected.length];
+				BlobUtils.readFully(is, actual, 0, expected.length, null);
+
+				for (int i = 0; i < expected.length; i++) {
+					assertEquals(expected[i], actual[i]);
+				}
+			}
+
+			// Verify request 4
+			try (InputStream is = client.get(jobId[1], keys[1])) {
+				byte[] actual = new byte[256];
+				BlobUtils.readFully(is, actual, 0, 256, null);
+
+				for (int i = 32, j = 0; i < 256; i++, j++) {
+					assertEquals(expected[i], actual[j]);
+				}
+			}
+
 			// Remove again
 			client.delete(keys[0]);
 			client.delete(keys[1]);
+			client.delete(jobId[0], keys[0]);
+			client.delete(jobId[1], keys[1]);
 
 			// Verify everything is clean
 			assertTrue("HA storage directory does not exist", fs.exists(new Path(storagePath)));

http://git-wip-us.apache.org/repos/asf/flink/blob/0a19c456/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
index ce4574b..d91aae42 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.blob;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.concurrent.FlinkFutureException;
@@ -39,6 +40,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
@@ -75,35 +77,45 @@ public class BlobServerDeleteTest extends TestLogger {
 			byte[] data = new byte[2000000];
 			rnd.nextBytes(data);
 
-			// put content addressable (like libraries)
-			BlobKey key = client.put(data);
-			assertNotNull(key);
+			// put job-unrelated (like libraries)
+			BlobKey key1 = client.put(null, data);
+			assertNotNull(key1);
 
-			// second item
+			// second job-unrelated item
 			data[0] ^= 1;
-			BlobKey key2 = client.put(data);
+			BlobKey key2 = client.put(null, data);
 			assertNotNull(key2);
-			assertNotEquals(key, key2);
+			assertNotEquals(key1, key2);
+
+			// put job-related with same key1 as non-job-related
+			data[0] ^= 1; // back to the original data
+			final JobID jobId = new JobID();
+			BlobKey key1b = client.put(jobId, data);
+			assertNotNull(key1b);
+			assertEquals(key1, key1b);
 
 			// issue a DELETE request via the client
-			client.delete(key);
+			client.delete(key1);
 			client.close();
 
 			client = new BlobClient(serverAddress, config);
 			try {
-				client.get(key);
+				client.get(key1);
 				fail("BLOB should have been deleted");
 			}
 			catch (IOException e) {
 				// expected
 			}
 
+			ensureClientIsClosed(client);
+
+			client = new BlobClient(serverAddress, config);
 			try {
-				client.put(new byte[1]);
-				fail("client should be closed after erroneous operation");
+				client.get(jobId, key1);
 			}
-			catch (IllegalStateException e) {
+			catch (IOException e) {
 				// expected
+				fail("Deleting a job-unrelated BLOB should not affect a job-related BLOB with the same key");
 			}
 
 			// delete a file directly on the server
@@ -125,8 +137,29 @@ public class BlobServerDeleteTest extends TestLogger {
 		}
 	}
 
+	private static void ensureClientIsClosed(final BlobClient client) throws IOException {
+		try {
+			client.put(null, new byte[1]);
+			fail("client should be closed after erroneous operation");
+		}
+		catch (IllegalStateException e) {
+			// expected
+		} finally {
+			client.close();
+		}
+	}
+
+	@Test
+	public void testDeleteAlreadyDeletedNoJob() {
+		testDeleteAlreadyDeleted(null);
+	}
+
 	@Test
-	public void testDeleteAlreadyDeletedByBlobKey() {
+	public void testDeleteAlreadyDeletedForJob() {
+		testDeleteAlreadyDeleted(new JobID());
+	}
+
+	private void testDeleteAlreadyDeleted(final JobID jobId) {
 		BlobServer server = null;
 		BlobClient client = null;
 		BlobStore blobStore = new VoidBlobStore();
@@ -143,23 +176,27 @@ public class BlobServerDeleteTest extends TestLogger {
 			byte[] data = new byte[2000000];
 			rnd.nextBytes(data);
 
-			// put content addressable (like libraries)
-			BlobKey key = client.put(data);
+			// put file
+			BlobKey key = client.put(jobId, data);
 			assertNotNull(key);
 
-			File blobFile = server.getStorageLocation(key);
+			File blobFile = server.getStorageLocation(jobId, key);
 			assertTrue(blobFile.delete());
 
 			// issue a DELETE request via the client
 			try {
-				client.delete(key);
+				deleteHelper(client, jobId, key);
 			}
 			catch (IOException e) {
 				fail("DELETE operation should not fail if file is already deleted");
 			}
 
 			// issue a DELETE request on the server
-			server.delete(key);
+			if (jobId == null) {
+				server.delete(key);
+			} else {
+				server.delete(jobId, key);
+			}
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -170,8 +207,25 @@ public class BlobServerDeleteTest extends TestLogger {
 		}
 	}
 
+	private static void deleteHelper(BlobClient client, JobID jobId, BlobKey key) throws IOException {
+		if (jobId == null) {
+			client.delete(key);
+		} else {
+			client.delete(jobId, key);
+		}
+	}
+
 	@Test
-	public void testDeleteByBlobKeyFails() {
+	public void testDeleteFailsNoJob() {
+		testDeleteFails(null);
+	}
+
+	@Test
+	public void testDeleteFailsForJob() {
+		testDeleteFails(new JobID());
+	}
+
+	private void testDeleteFails(final JobID jobId) {
 		assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't work on Windows.
 
 		BlobServer server = null;
@@ -193,29 +247,39 @@ public class BlobServerDeleteTest extends TestLogger {
 			rnd.nextBytes(data);
 
 			// put content addressable (like libraries)
-			BlobKey key = client.put(data);
+			BlobKey key = client.put(jobId, data);
 			assertNotNull(key);
 
-			blobFile = server.getStorageLocation(key);
+			blobFile = server.getStorageLocation(jobId, key);
 			directory = blobFile.getParentFile();
 
 			assertTrue(blobFile.setWritable(false, false));
 			assertTrue(directory.setWritable(false, false));
 
 			// issue a DELETE request via the client
-			client.delete(key);
+			deleteHelper(client, jobId, key);
 
 			// issue a DELETE request on the server
-			server.delete(key);
+			if (jobId == null) {
+				server.delete(key);
+			} else {
+				server.delete(jobId, key);
+			}
 
 			// the file should still be there
-			server.getFile(key);
+			if (jobId == null) {
+				server.getFile(key);
+			} else {
+				server.getFile(jobId, key);
+			}
 		} catch (Exception e) {
 			e.printStackTrace();
 			fail(e.getMessage());
 		} finally {
 			if (blobFile != null && directory != null) {
+				//noinspection ResultOfMethodCallIgnored
 				blobFile.setWritable(true, false);
+				//noinspection ResultOfMethodCallIgnored
 				directory.setWritable(true, false);
 			}
 			cleanup(server, client);
@@ -233,10 +297,29 @@ public class BlobServerDeleteTest extends TestLogger {
 	 * broken.
 	 */
 	@Test
-	public void testConcurrentDeleteOperations() throws IOException, ExecutionException, InterruptedException {
+	public void testConcurrentDeleteOperationsNoJob() throws IOException, ExecutionException, InterruptedException {
+		testConcurrentDeleteOperations(null);
+	}
+
+	/**
+	 * FLINK-6020
+	 *
+	 * Tests that concurrent delete operations don't interfere with each other.
+	 *
+	 * Note: The test checks that there cannot be two threads which have checked whether a given blob file exist
+	 * and then one of them fails deleting it. Without the introduced lock, this situation should rarely happen
+	 * and make this test fail. Thus, if this test should become "unstable", then the delete atomicity is most likely
+	 * broken.
+	 */
+	@Test
+	public void testConcurrentDeleteOperationsForJob() throws IOException, ExecutionException, InterruptedException {
+		testConcurrentDeleteOperations(new JobID());
+	}
+
+	private void testConcurrentDeleteOperations(final JobID jobId)
+		throws IOException, InterruptedException, ExecutionException {
 		final Configuration config = new Configuration();
 		config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
-
 		final BlobStore blobStore = mock(BlobStore.class);
 
 		final int concurrentDeleteOperations = 3;
@@ -251,16 +334,16 @@ public class BlobServerDeleteTest extends TestLogger {
 			final BlobKey blobKey;
 
 			try (BlobClient client = blobServer.createClient()) {
-				blobKey = client.put(data);
+				blobKey = client.put(jobId, data);
 			}
 
-			assertTrue(blobServer.getStorageLocation(blobKey).exists());
+			assertTrue(blobServer.getStorageLocation(jobId, blobKey).exists());
 
 			for (int i = 0; i < concurrentDeleteOperations; i++) {
 				CompletableFuture<Void> deleteFuture = CompletableFuture.supplyAsync(
 					() -> {
 						try (BlobClient blobClient = blobServer.createClient()) {
-							blobClient.delete(blobKey);
+							deleteHelper(blobClient, jobId, blobKey);
 						} catch (IOException e) {
 							throw new FlinkFutureException("Could not delete the given blob key " + blobKey + '.', e);
 						}
@@ -278,13 +361,13 @@ public class BlobServerDeleteTest extends TestLogger {
 			// in case of no lock, one of the delete operations should eventually fail
 			waitFuture.get();
 
-			assertFalse(blobServer.getStorageLocation(blobKey).exists());
+			assertFalse(blobServer.getStorageLocation(jobId, blobKey).exists());
 		} finally {
 			executor.shutdownNow();
 		}
 	}
 
-	private void cleanup(BlobServer server, BlobClient client) {
+	private static void cleanup(BlobServer server, BlobClient client) {
 		if (client != null) {
 			try {
 				client.close();

http://git-wip-us.apache.org/repos/asf/flink/blob/0a19c456/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
index bd27d70..5ad8d95 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.blob;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.concurrent.FlinkFutureException;
@@ -69,7 +70,28 @@ public class BlobServerGetTest extends TestLogger {
 	public TemporaryFolder temporaryFolder = new TemporaryFolder();
 
 	@Test
-	public void testGetFailsDuringLookup() throws IOException {
+	public void testGetFailsDuringLookup1() throws IOException {
+		testGetFailsDuringLookup(null, new JobID());
+	}
+
+	@Test
+	public void testGetFailsDuringLookup2() throws IOException {
+		testGetFailsDuringLookup(new JobID(), new JobID());
+	}
+
+	@Test
+	public void testGetFailsDuringLookup3() throws IOException {
+		testGetFailsDuringLookup(new JobID(), null);
+	}
+
+	/**
+	 * Checks the correct result if a GET operation fails during the lookup of the file.
+	 *
+	 * @param jobId1 first job ID or <tt>null</tt> if job-unrelated
+	 * @param jobId2 second job ID different to <tt>jobId1</tt>
+	 */
+	private void testGetFailsDuringLookup(final JobID jobId1, final JobID jobId2)
+		throws IOException {
 		BlobServer server = null;
 		BlobClient client = null;
 
@@ -86,20 +108,29 @@ public class BlobServerGetTest extends TestLogger {
 			rnd.nextBytes(data);
 
 			// put content addressable (like libraries)
-			BlobKey key = client.put(data);
+			BlobKey key = client.put(jobId1, data);
 			assertNotNull(key);
 
-			// delete all files to make sure that GET requests fail
-			File blobFile = server.getStorageLocation(key);
+			// delete file to make sure that GET requests fail
+			File blobFile = server.getStorageLocation(jobId1, key);
 			assertTrue(blobFile.delete());
 
 			// issue a GET request that fails
-			try {
-				client.get(key);
-				fail("This should not succeed.");
-			} catch (IOException e) {
-				// expected
-			}
+			client = verifyDeleted(client, jobId1, key, serverAddress, config);
+
+			BlobKey key2 = client.put(jobId2, data);
+			assertNotNull(key);
+			assertEquals(key, key2);
+			// request for jobId2 should succeed
+			getFileHelper(client, jobId2, key);
+			// request for jobId1 should still fail
+			client = verifyDeleted(client, jobId1, key, serverAddress, config);
+
+			// same checks as for jobId1 but for jobId2 should also work:
+			blobFile = server.getStorageLocation(jobId2, key);
+			assertTrue(blobFile.delete());
+			client = verifyDeleted(client, jobId2, key, serverAddress, config);
+
 		} finally {
 			if (client != null) {
 				client.close();
@@ -110,8 +141,51 @@ public class BlobServerGetTest extends TestLogger {
 		}
 	}
 
+	/**
+	 * Checks that the given blob does not exist anymore.
+	 *
+	 * @param client
+	 * 		BLOB client to use for connecting to the BLOB server
+	 * @param jobId
+	 * 		job ID or <tt>null</tt> if job-unrelated
+	 * @param key
+	 * 		key identifying the BLOB to request
+	 * @param serverAddress
+	 * 		BLOB server address
+	 * @param config
+	 * 		client config
+	 *
+	 * @return a new client (since the old one is being closed on failure)
+	 */
+	private static BlobClient verifyDeleted(
+			BlobClient client, JobID jobId, BlobKey key,
+			InetSocketAddress serverAddress, Configuration config) throws IOException {
+		try {
+			getFileHelper(client, jobId, key);
+			fail("This should not succeed.");
+		} catch (IOException e) {
+			// expected
+		}
+		// need a new client (old ony closed due to failure
+		return new BlobClient(serverAddress, config);
+	}
+
+	@Test
+	public void testGetFailsDuringStreamingNoJob() throws IOException {
+		testGetFailsDuringStreaming(null);
+	}
+
 	@Test
-	public void testGetFailsDuringStreaming() throws IOException {
+	public void testGetFailsDuringStreamingForJob() throws IOException {
+		testGetFailsDuringStreaming(new JobID());
+	}
+
+	/**
+	 * Checks the correct result if a GET operation fails during the file download.
+	 *
+	 * @param jobId job ID or <tt>null</tt> if job-unrelated
+	 */
+	private void testGetFailsDuringStreaming(final JobID jobId) throws IOException {
 		BlobServer server = null;
 		BlobClient client = null;
 
@@ -128,11 +202,11 @@ public class BlobServerGetTest extends TestLogger {
 			rnd.nextBytes(data);
 
 			// put content addressable (like libraries)
-			BlobKey key = client.put(data);
+			BlobKey key = client.put(jobId, data);
 			assertNotNull(key);
 
 			// issue a GET request that succeeds
-			InputStream is = client.get(key);
+			InputStream is = getFileHelper(client, jobId, key);
 
 			byte[] receiveBuffer = new byte[data.length];
 			int firstChunkLen = 50000;
@@ -169,8 +243,22 @@ public class BlobServerGetTest extends TestLogger {
 	 * Tests that concurrent get operations don't concurrently access the BlobStore to download a blob.
 	 */
 	@Test
-	public void testConcurrentGetOperations() throws IOException, ExecutionException, InterruptedException {
+	public void testConcurrentGetOperationsNoJob() throws IOException, ExecutionException, InterruptedException {
+		testConcurrentGetOperations(null);
+	}
 
+	/**
+	 * FLINK-6020
+	 *
+	 * Tests that concurrent get operations don't concurrently access the BlobStore to download a blob.
+	 */
+	@Test
+	public void testConcurrentGetOperationsForJob() throws IOException, ExecutionException, InterruptedException {
+		testConcurrentGetOperations(new JobID());
+	}
+
+	private void testConcurrentGetOperations(final JobID jobId)
+			throws IOException, InterruptedException, ExecutionException {
 		final Configuration config = new Configuration();
 		config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
 
@@ -191,14 +279,14 @@ public class BlobServerGetTest extends TestLogger {
 			new Answer() {
 				@Override
 				public Object answer(InvocationOnMock invocation) throws Throwable {
-					File targetFile = (File) invocation.getArguments()[1];
+					File targetFile = (File) invocation.getArguments()[2];
 
 					FileUtils.copyInputStreamToFile(bais, targetFile);
 
 					return null;
 				}
 			}
-		).when(blobStore).get(any(BlobKey.class), any(File.class));
+		).when(blobStore).get(any(JobID.class), any(BlobKey.class), any(File.class));
 
 		final ExecutorService executor = Executors.newFixedThreadPool(numberConcurrentGetOperations);
 
@@ -207,7 +295,7 @@ public class BlobServerGetTest extends TestLogger {
 				CompletableFuture<InputStream> getOperation = CompletableFuture.supplyAsync(
 					() -> {
 						try (BlobClient blobClient = blobServer.createClient();
-							 InputStream inputStream = blobClient.get(blobKey)) {
+							InputStream inputStream = getFileHelper(blobClient, jobId, blobKey)) {
 							byte[] buffer = new byte[data.length];
 
 							IOUtils.readFully(inputStream, buffer);
@@ -241,9 +329,18 @@ public class BlobServerGetTest extends TestLogger {
 			}
 
 			// verify that we downloaded the requested blob exactly once from the BlobStore
-			verify(blobStore, times(1)).get(eq(blobKey), any(File.class));
+			verify(blobStore, times(1)).get(eq(jobId), eq(blobKey), any(File.class));
 		} finally {
 			executor.shutdownNow();
 		}
 	}
+
+	static InputStream getFileHelper(BlobClient blobClient, JobID jobId, BlobKey blobKey)
+		throws IOException {
+		if (jobId == null) {
+			return blobClient.get(blobKey);
+		} else {
+			return blobClient.get(jobId, blobKey);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0a19c456/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
index c479167..f55adb7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.blob;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.CheckedThread;
@@ -45,6 +46,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
+import static org.apache.flink.runtime.blob.BlobServerGetTest.getFileHelper;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -71,28 +73,43 @@ public class BlobServerPutTest extends TestLogger {
 	// --- concurrency tests for utility methods which could fail during the put operation ---
 
 	/**
-	 * Checked thread that calls {@link BlobServer#getStorageLocation(BlobKey)}
+	 * Checked thread that calls {@link BlobServer#getStorageLocation(JobID, BlobKey)}.
 	 */
 	public static class ContentAddressableGetStorageLocation extends CheckedThread {
 		private final BlobServer server;
+		private final JobID jobId;
 		private final BlobKey key;
 
-		public ContentAddressableGetStorageLocation(BlobServer server, BlobKey key) {
+		public ContentAddressableGetStorageLocation(BlobServer server, JobID jobId, BlobKey key) {
 			this.server = server;
+			this.jobId = jobId;
 			this.key = key;
 		}
 
 		@Override
 		public void go() throws Exception {
-			server.getStorageLocation(key);
+			server.getStorageLocation(jobId, key);
 		}
 	}
 
 	/**
-	 * Tests concurrent calls to {@link BlobServer#getStorageLocation(BlobKey)}.
+	 * Tests concurrent calls to {@link BlobServer#getStorageLocation(JobID, BlobKey)}.
 	 */
 	@Test
-	public void testServerContentAddressableGetStorageLocationConcurrent() throws Exception {
+	public void testServerContentAddressableGetStorageLocationConcurrentNoJob() throws Exception {
+		testServerContentAddressableGetStorageLocationConcurrent(null);
+	}
+
+	/**
+	 * Tests concurrent calls to {@link BlobServer#getStorageLocation(JobID, BlobKey)}.
+	 */
+	@Test
+	public void testServerContentAddressableGetStorageLocationConcurrentForJob() throws Exception {
+		testServerContentAddressableGetStorageLocationConcurrent(new JobID());
+	}
+
+	private void testServerContentAddressableGetStorageLocationConcurrent(final JobID jobId)
+		throws Exception {
 		final Configuration config = new Configuration();
 		config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
 
@@ -101,9 +118,9 @@ public class BlobServerPutTest extends TestLogger {
 		try {
 			BlobKey key = new BlobKey();
 			CheckedThread[] threads = new CheckedThread[] {
-				new ContentAddressableGetStorageLocation(server, key),
-				new ContentAddressableGetStorageLocation(server, key),
-				new ContentAddressableGetStorageLocation(server, key)
+				new ContentAddressableGetStorageLocation(server, jobId, key),
+				new ContentAddressableGetStorageLocation(server, jobId, key),
+				new ContentAddressableGetStorageLocation(server, jobId, key)
 			};
 			checkedThreadSimpleTest(threads);
 		} finally {
@@ -134,7 +151,27 @@ public class BlobServerPutTest extends TestLogger {
 	// --------------------------------------------------------------------------------------------
 
 	@Test
-	public void testPutBufferSuccessful() throws IOException {
+	public void testPutBufferSuccessfulGet1() throws IOException {
+		testPutBufferSuccessfulGet(null, null);
+	}
+
+	@Test
+	public void testPutBufferSuccessfulGet2() throws IOException {
+		testPutBufferSuccessfulGet(null, new JobID());
+	}
+
+	@Test
+	public void testPutBufferSuccessfulGet3() throws IOException {
+		testPutBufferSuccessfulGet(new JobID(), new JobID());
+	}
+
+	@Test
+	public void testPutBufferSuccessfulGet4() throws IOException {
+		testPutBufferSuccessfulGet(new JobID(), null);
+	}
+
+	private void testPutBufferSuccessfulGet(final JobID jobId1, final JobID jobId2)
+		throws IOException {
 		BlobServer server = null;
 		BlobClient client = null;
 
@@ -150,17 +187,66 @@ public class BlobServerPutTest extends TestLogger {
 			byte[] data = new byte[2000000];
 			rnd.nextBytes(data);
 
-			// put content addressable (like libraries)
-			BlobKey key1 = client.put(data);
-			assertNotNull(key1);
+			// put data for jobId1 and verify
+			BlobKey key1a = client.put(jobId1, data);
+			assertNotNull(key1a);
+
+			BlobKey key1b = client.put(jobId1, data, 10, 44);
+			assertNotNull(key1b);
+
+			testPutBufferSuccessfulGet(jobId1, key1a, key1b, data, serverAddress, config);
+
+			// now put data for jobId2 and verify that both are ok
+			BlobKey key2a = client.put(jobId2, data);
+			assertNotNull(key2a);
+			assertEquals(key1a, key2a);
+
+			BlobKey key2b = client.put(jobId2, data, 10, 44);
+			assertNotNull(key2b);
+			assertEquals(key1b, key2b);
+
+
+			testPutBufferSuccessfulGet(jobId1, key1a, key1b, data, serverAddress, config);
+			testPutBufferSuccessfulGet(jobId2, key2a, key2b, data, serverAddress, config);
+
+
+		} finally {
+			if (client != null) {
+				client.close();
+			}
+			if (server != null) {
+				server.close();
+			}
+		}
+	}
 
-			BlobKey key2 = client.put(data, 10, 44);
-			assertNotNull(key2);
+	/**
+	 * GET the data stored at the two keys and check that it is equal to <tt>data</tt>.
+	 *
+	 * @param jobId
+	 * 		job ID or <tt>null</tt> if job-unrelated
+	 * @param key1
+	 * 		first key
+	 * @param key2
+	 * 		second key
+	 * @param data
+	 * 		expected data
+	 * @param serverAddress
+	 * 		BlobServer address to connect to
+	 * @param config
+	 * 		client configuration
+	 */
+	private static void testPutBufferSuccessfulGet(
+			JobID jobId, BlobKey key1, BlobKey key2, byte[] data,
+			InetSocketAddress serverAddress, Configuration config) throws IOException {
 
-			// --- GET the data and check that it is equal ---
+		BlobClient client = new BlobClient(serverAddress, config);
+		InputStream is1 = null;
+		InputStream is2 = null;
 
+		try {
 			// one get request on the same client
-			InputStream is1 = client.get(key2);
+			is1 = getFileHelper(client, jobId, key2);
 			byte[] result1 = new byte[44];
 			BlobUtils.readFully(is1, result1, 0, result1.length, null);
 			is1.close();
@@ -173,24 +259,31 @@ public class BlobServerPutTest extends TestLogger {
 			client.close();
 			client = new BlobClient(serverAddress, config);
 
-			InputStream is2 = client.get(key1);
-			byte[] result2 = new byte[data.length];
-			BlobUtils.readFully(is2, result2, 0, result2.length, null);
+			is2 = getFileHelper(client, jobId, key1);
+			BlobClientTest.validateGet(is2, data);
 			is2.close();
-			assertArrayEquals(data, result2);
 		} finally {
-			if (client != null) {
-				client.close();
+			if (is1 != null) {
+				is1.close();
 			}
-			if (server != null) {
-				server.close();
+			if (is2 != null) {
+				is1.close();
 			}
+			client.close();
 		}
 	}
 
+	@Test
+	public void testPutStreamSuccessfulNoJob() throws IOException {
+		testPutStreamSuccessful(null);
+	}
 
 	@Test
-	public void testPutStreamSuccessful() throws IOException {
+	public void testPutStreamSuccessfulForJob() throws IOException {
+		testPutStreamSuccessful(new JobID());
+	}
+
+	private void testPutStreamSuccessful(final JobID jobId) throws IOException {
 		BlobServer server = null;
 		BlobClient client = null;
 
@@ -208,7 +301,12 @@ public class BlobServerPutTest extends TestLogger {
 
 			// put content addressable (like libraries)
 			{
-				BlobKey key1 = client.put(new ByteArrayInputStream(data));
+				BlobKey key1;
+				if (jobId == null) {
+					key1 = client.put(new ByteArrayInputStream(data));
+				} else {
+					key1 = client.put(jobId, new ByteArrayInputStream(data));
+				}
 				assertNotNull(key1);
 			}
 		} finally {
@@ -226,7 +324,16 @@ public class BlobServerPutTest extends TestLogger {
 	}
 
 	@Test
-	public void testPutChunkedStreamSuccessful() throws IOException {
+	public void testPutChunkedStreamSuccessfulNoJob() throws IOException {
+		testPutChunkedStreamSuccessful(null);
+	}
+
+	@Test
+	public void testPutChunkedStreamSuccessfulForJob() throws IOException {
+		testPutChunkedStreamSuccessful(new JobID());
+	}
+
+	private void testPutChunkedStreamSuccessful(final JobID jobId) throws IOException {
 		BlobServer server = null;
 		BlobClient client = null;
 
@@ -244,7 +351,12 @@ public class BlobServerPutTest extends TestLogger {
 
 			// put content addressable (like libraries)
 			{
-				BlobKey key1 = client.put(new ChunkedInputStream(data, 19));
+				BlobKey key1;
+				if (jobId == null) {
+					key1 = client.put(new ChunkedInputStream(data, 19));
+				} else {
+					key1 = client.put(jobId, new ChunkedInputStream(data, 19));
+				}
 				assertNotNull(key1);
 			}
 		} finally {
@@ -258,7 +370,16 @@ public class BlobServerPutTest extends TestLogger {
 	}
 
 	@Test
-	public void testPutBufferFails() throws IOException {
+	public void testPutBufferFailsNoJob() throws IOException {
+		testPutBufferFails(null);
+	}
+
+	@Test
+	public void testPutBufferFailsForJob() throws IOException {
+		testPutBufferFails(new JobID());
+	}
+
+	private void testPutBufferFails(final JobID jobId) throws IOException {
 		assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't work on Windows.
 
 		BlobServer server = null;
@@ -285,7 +406,7 @@ public class BlobServerPutTest extends TestLogger {
 
 			// put content addressable (like libraries)
 			try {
-				client.put(data);
+				client.put(jobId, data);
 				fail("This should fail.");
 			}
 			catch (IOException e) {
@@ -293,7 +414,7 @@ public class BlobServerPutTest extends TestLogger {
 			}
 
 			try {
-				client.put(data);
+				client.put(jobId, data);
 				fail("Client should be closed");
 			}
 			catch (IllegalStateException e) {
@@ -320,7 +441,22 @@ public class BlobServerPutTest extends TestLogger {
 	 * Tests that concurrent put operations will only upload the file once to the {@link BlobStore}.
 	 */
 	@Test
-	public void testConcurrentPutOperations() throws IOException, ExecutionException, InterruptedException {
+	public void testConcurrentPutOperationsNoJob() throws IOException, ExecutionException, InterruptedException {
+		testConcurrentPutOperations(null);
+	}
+
+	/**
+	 * FLINK-6020
+	 *
+	 * Tests that concurrent put operations will only upload the file once to the {@link BlobStore}.
+	 */
+	@Test
+	public void testConcurrentPutOperationsForJob() throws IOException, ExecutionException, InterruptedException {
+		testConcurrentPutOperations(new JobID());
+	}
+
+	private void testConcurrentPutOperations(final JobID jobId)
+			throws IOException, InterruptedException, ExecutionException {
 		final Configuration config = new Configuration();
 		config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
 
@@ -331,7 +467,7 @@ public class BlobServerPutTest extends TestLogger {
 		final CountDownLatch countDownLatch = new CountDownLatch(concurrentPutOperations);
 		final byte[] data = new byte[dataSize];
 
-		ArrayList<CompletableFuture<BlobKey>> allFutures = new ArrayList(concurrentPutOperations);
+		ArrayList<CompletableFuture<BlobKey>> allFutures = new ArrayList<>(concurrentPutOperations);
 
 		ExecutorService executor = Executors.newFixedThreadPool(concurrentPutOperations);
 
@@ -342,7 +478,13 @@ public class BlobServerPutTest extends TestLogger {
 				CompletableFuture<BlobKey> putFuture = CompletableFuture.supplyAsync(
 					() -> {
 						try (BlobClient blobClient = blobServer.createClient()) {
-							return blobClient.put(new BlockingInputStream(countDownLatch, data));
+							if (jobId == null) {
+								return blobClient
+									.put(new BlockingInputStream(countDownLatch, data));
+							} else {
+								return blobClient
+									.put(jobId, new BlockingInputStream(countDownLatch, data));
+							}
 						} catch (IOException e) {
 							throw new FlinkFutureException("Could not upload blob.", e);
 						}
@@ -369,7 +511,7 @@ public class BlobServerPutTest extends TestLogger {
 			}
 
 			// check that we only uploaded the file once to the blob store
-			verify(blobStore, times(1)).put(any(File.class), eq(blobKey));
+			verify(blobStore, times(1)).put(any(File.class), eq(jobId), eq(blobKey));
 		} finally {
 			executor.shutdownNow();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/0a19c456/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java
index 2987c39..e449aab 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java
@@ -18,11 +18,9 @@
 
 package org.apache.flink.runtime.blob;
 
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assume.assumeTrue;
-import static org.mockito.Mockito.mock;
-
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.util.OperatingSystem;
+
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
@@ -32,6 +30,10 @@ import org.junit.rules.TemporaryFolder;
 import java.io.File;
 import java.io.IOException;
 
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeTrue;
+import static org.mockito.Mockito.mock;
+
 public class BlobUtilsTest {
 
 	private final static String CANNOT_CREATE_THIS = "cannot-create-this";
@@ -62,12 +64,18 @@ public class BlobUtilsTest {
 	public void testExceptionOnCreateStorageDirectoryFailure() throws
 		IOException {
 		// Should throw an Exception
-		BlobUtils.initStorageDirectory(new File(blobUtilsTestDirectory, CANNOT_CREATE_THIS).getAbsolutePath());
+		BlobUtils.initLocalStorageDirectory(new File(blobUtilsTestDirectory, CANNOT_CREATE_THIS).getAbsolutePath());
+	}
+
+	@Test(expected = Exception.class)
+	public void testExceptionOnCreateCacheDirectoryFailureNoJob() {
+		// Should throw an Exception
+		BlobUtils.getStorageLocation(new File(blobUtilsTestDirectory, CANNOT_CREATE_THIS), null, mock(BlobKey.class));
 	}
 
 	@Test(expected = Exception.class)
-	public void testExceptionOnCreateCacheDirectoryFailure() {
+	public void testExceptionOnCreateCacheDirectoryFailureForJob() {
 		// Should throw an Exception
-		BlobUtils.getStorageLocation(new File(blobUtilsTestDirectory, CANNOT_CREATE_THIS), mock(BlobKey.class));
+		BlobUtils.getStorageLocation(new File(blobUtilsTestDirectory, CANNOT_CREATE_THIS), new JobID(), mock(BlobKey.class));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0a19c456/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
index 476fdcb..b43a307 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
@@ -73,17 +73,20 @@ public class BlobLibraryCacheManagerTest {
 			InetSocketAddress blobSocketAddress = new InetSocketAddress(server.getPort());
 			BlobClient bc = new BlobClient(blobSocketAddress, config);
 
-			keys.add(bc.put(buf));
+			// TODO: make use of job-related BLOBs after adapting the BlobLibraryCacheManager
+			JobID jobId = null;
+
+			keys.add(bc.put(jobId, buf));
 			buf[0] += 1;
-			keys.add(bc.put(buf));
+			keys.add(bc.put(jobId, buf));
 
 			bc.close();
 
-			long cleanupInterval = 1000l;
+			long cleanupInterval = 1000L;
 			libraryCacheManager = new BlobLibraryCacheManager(server, cleanupInterval);
 			libraryCacheManager.registerJob(jid, keys, Collections.<URL>emptyList());
 
-			assertEquals(2, checkFilesExist(keys, server, true));
+			assertEquals(2, checkFilesExist(jobId, keys, server, true));
 			assertEquals(2, libraryCacheManager.getNumberOfCachedLibraries());
 			assertEquals(1, libraryCacheManager.getNumberOfReferenceHolders(jid));
 
@@ -105,17 +108,25 @@ public class BlobLibraryCacheManagerTest {
 			assertEquals(0, libraryCacheManager.getNumberOfReferenceHolders(jid));
 
 			// the blob cache should no longer contain the files
-			assertEquals(0, checkFilesExist(keys, server, false));
+			assertEquals(0, checkFilesExist(jobId, keys, server, false));
 
 			try {
-				server.getFile(keys.get(0));
-				fail("name-addressable BLOB should have been deleted");
+				if (jobId == null) {
+					server.getFile(keys.get(0));
+				} else {
+					server.getFile(jobId, keys.get(0));
+				}
+				fail("BLOB should have been deleted");
 			} catch (IOException e) {
 				// expected
 			}
 			try {
-				server.getFile(keys.get(1));
-				fail("name-addressable BLOB should have been deleted");
+				if (jobId == null) {
+					server.getFile(keys.get(1));
+				} else {
+					server.getFile(jobId, keys.get(1));
+				}
+				fail("BLOB should have been deleted");
 			} catch (IOException e) {
 				// expected
 			}
@@ -150,16 +161,20 @@ public class BlobLibraryCacheManagerTest {
 	 * @param doThrow
 	 * 		whether exceptions should be ignored (<tt>false</tt>), or throws (<tt>true</tt>)
 	 *
-	 * @return number of files we were able to retrieve via {@link BlobService#getFile(BlobKey)}
+	 * @return number of files we were able to retrieve via {@link BlobService#getFile}
 	 */
 	private static int checkFilesExist(
-		List<BlobKey> keys, BlobService blobService, boolean doThrow)
+			JobID jobId, List<BlobKey> keys, BlobService blobService, boolean doThrow)
 			throws IOException {
 		int numFiles = 0;
 
 		for (BlobKey key : keys) {
 			try {
-				blobService.getFile(key);
+				if (jobId == null) {
+					blobService.getFile(key);
+				} else {
+					blobService.getFile(jobId, key);
+				}
 				++numFiles;
 			} catch (IOException e) {
 				if (doThrow) {
@@ -196,22 +211,26 @@ public class BlobLibraryCacheManagerTest {
 			InetSocketAddress blobSocketAddress = new InetSocketAddress(server.getPort());
 			BlobClient bc = new BlobClient(blobSocketAddress, config);
 
-			keys.add(bc.put(buf));
+			// TODO: make use of job-related BLOBs after adapting the BlobLibraryCacheManager
+//			JobID jobId = new JobID();
+			JobID jobId = null;
+
+			keys.add(bc.put(jobId, buf));
 			buf[0] += 1;
-			keys.add(bc.put(buf));
+			keys.add(bc.put(jobId, buf));
 
-			long cleanupInterval = 1000l;
+			long cleanupInterval = 1000L;
 			libraryCacheManager = new BlobLibraryCacheManager(server, cleanupInterval);
 			libraryCacheManager.registerTask(jid, executionId1, keys, Collections.<URL>emptyList());
 			libraryCacheManager.registerTask(jid, executionId2, keys, Collections.<URL>emptyList());
 
-			assertEquals(2, checkFilesExist(keys, server, true));
+			assertEquals(2, checkFilesExist(jobId, keys, server, true));
 			assertEquals(2, libraryCacheManager.getNumberOfCachedLibraries());
 			assertEquals(2, libraryCacheManager.getNumberOfReferenceHolders(jid));
 
 			libraryCacheManager.unregisterTask(jid, executionId1);
 
-			assertEquals(2, checkFilesExist(keys, server, true));
+			assertEquals(2, checkFilesExist(jobId, keys, server, true));
 			assertEquals(2, libraryCacheManager.getNumberOfCachedLibraries());
 			assertEquals(1, libraryCacheManager.getNumberOfReferenceHolders(jid));
 
@@ -233,7 +252,7 @@ public class BlobLibraryCacheManagerTest {
 			assertEquals(0, libraryCacheManager.getNumberOfReferenceHolders(jid));
 
 			// the blob cache should no longer contain the files
-			assertEquals(0, checkFilesExist(keys, server, false));
+			assertEquals(0, checkFilesExist(jobId, keys, server, false));
 
 			bc.close();
 		} finally {
@@ -269,10 +288,13 @@ public class BlobLibraryCacheManagerTest {
 			InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
 			cache = new BlobCache(serverAddress, config, new VoidBlobStore());
 
+			// TODO: make use of job-related BLOBs after adapting the BlobLibraryCacheManager
+			JobID jobId = null;
+
 			// upload some meaningless data to the server
 			BlobClient uploader = new BlobClient(serverAddress, config);
-			BlobKey dataKey1 = uploader.put(new byte[]{1, 2, 3, 4, 5, 6, 7, 8});
-			BlobKey dataKey2 = uploader.put(new byte[]{11, 12, 13, 14, 15, 16, 17, 18});
+			BlobKey dataKey1 = uploader.put(jobId, new byte[]{1, 2, 3, 4, 5, 6, 7, 8});
+			BlobKey dataKey2 = uploader.put(jobId, new byte[]{11, 12, 13, 14, 15, 16, 17, 18});
 			uploader.close();
 
 			BlobLibraryCacheManager libCache = new BlobLibraryCacheManager(cache, 1000000000L);
@@ -316,11 +338,12 @@ public class BlobLibraryCacheManagerTest {
 					fail("Should fail with an IllegalStateException");
 				}
 				catch (IllegalStateException e) {
-					// that#s what we want
+					// that's what we want
 				}
 			}
 
-			cacheDir = new File(cache.getStorageDir(), "cache");
+			// see BlobUtils for the directory layout
+			cacheDir = new File(cache.getStorageDir(), "no_job");
 			assertTrue(cacheDir.exists());
 
 			// make sure no further blobs can be downloaded by removing the write

http://git-wip-us.apache.org/repos/asf/flink/blob/0a19c456/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
index b19835b..2f6738d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
@@ -91,10 +91,14 @@ public class BlobLibraryCacheRecoveryITCase extends TestLogger {
 
 			List<BlobKey> keys = new ArrayList<>(2);
 
+			JobID jobId = new JobID();
+			// TODO: replace+adapt by jobId after adapting the BlobLibraryCacheManager
+			JobID blobJobId = null;
+
 			// Upload some data (libraries)
 			try (BlobClient client = new BlobClient(serverAddress[0], config)) {
-				keys.add(client.put(expected)); // Request 1
-				keys.add(client.put(expected, 32, 256)); // Request 2
+				keys.add(client.put(blobJobId, expected)); // Request 1
+				keys.add(client.put(blobJobId, expected, 32, 256)); // Request 2
 			}
 
 			// The cache
@@ -102,7 +106,6 @@ public class BlobLibraryCacheRecoveryITCase extends TestLogger {
 			libCache = new BlobLibraryCacheManager(cache, 3600 * 1000);
 
 			// Register uploaded libraries
-			JobID jobId = new JobID();
 			ExecutionAttemptID executionId = new ExecutionAttemptID();
 			libServer[0].registerTask(jobId, executionId, keys, Collections.<URL>emptyList());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0a19c456/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
index 79b9c1c..3c75971 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.jobmanager;
 
 import akka.actor.ActorSystem;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -136,9 +137,11 @@ public class JobSubmitTest {
 			// upload two dummy bytes and add their keys to the job graph as dependencies
 			BlobKey key1, key2;
 			BlobClient bc = new BlobClient(new InetSocketAddress("localhost", blobPort), jmConfig);
+			// TODO: make use of job-related BLOBs after adapting the BlobLibraryCacheManager
+			JobID jobId = null;
 			try {
-				key1 = bc.put(new byte[10]);
-				key2 = bc.put(new byte[10]);
+				key1 = bc.put(jobId, new byte[10]);
+				key2 = bc.put(jobId, new byte[10]);
 
 				// delete one of the blobs to make sure that the startup failed
 				bc.delete(key2);


[3/3] flink git commit: [FLINK-7056][blob] add API to allow job-related BLOBs to be stored

Posted by tr...@apache.org.
[FLINK-7056][blob] add API to allow job-related BLOBs to be stored

[FLINK-7056][blob] refactor the new API for job-related BLOBs

For a cleaner API, instead of having a nullable jobId parameter, use two methods:
one for job-related BLOBs, another for job-unrelated ones.

This closes #4237.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0a19c456
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0a19c456
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0a19c456

Branch: refs/heads/master
Commit: 0a19c456ac7781d94eb0aaaf8f2ac73d0157bacb
Parents: b7c1dfa
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Wed Jun 21 18:04:43 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Aug 14 11:06:24 2017 +0200

----------------------------------------------------------------------
 .../apache/flink/runtime/blob/BlobCache.java    | 167 +++++++--
 .../apache/flink/runtime/blob/BlobClient.java   | 361 +++++++++++++------
 .../apache/flink/runtime/blob/BlobServer.java   | 132 +++++--
 .../runtime/blob/BlobServerConnection.java      | 228 +++++++-----
 .../flink/runtime/blob/BlobServerProtocol.java  |  12 +-
 .../apache/flink/runtime/blob/BlobService.java  |  29 +-
 .../apache/flink/runtime/blob/BlobStore.java    |   6 +-
 .../apache/flink/runtime/blob/BlobUtils.java    | 151 +++++---
 .../org/apache/flink/runtime/blob/BlobView.java |   5 +-
 .../flink/runtime/blob/FileSystemBlobStore.java |  14 +-
 .../flink/runtime/blob/VoidBlobStore.java       |   7 +-
 .../apache/flink/runtime/client/JobClient.java  |   1 +
 .../apache/flink/runtime/jobgraph/JobGraph.java |   1 +
 .../runtime/blob/BlobCacheRetriesTest.java      | 103 ++++--
 .../runtime/blob/BlobCacheSuccessTest.java      | 100 ++++-
 .../flink/runtime/blob/BlobClientTest.java      |  36 +-
 .../flink/runtime/blob/BlobRecoveryITCase.java  |  34 +-
 .../runtime/blob/BlobServerDeleteTest.java      | 143 ++++++--
 .../flink/runtime/blob/BlobServerGetTest.java   | 133 ++++++-
 .../flink/runtime/blob/BlobServerPutTest.java   | 212 +++++++++--
 .../flink/runtime/blob/BlobUtilsTest.java       |  22 +-
 .../BlobLibraryCacheManagerTest.java            |  67 ++--
 .../BlobLibraryCacheRecoveryITCase.java         |   9 +-
 .../flink/runtime/jobmanager/JobSubmitTest.java |   7 +-
 24 files changed, 1503 insertions(+), 477 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0a19c456/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
index 3e19537..29f7706 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
@@ -18,12 +18,15 @@
 
 package org.apache.flink.runtime.blob;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
@@ -38,7 +41,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 /**
  * The BLOB cache implements a local cache for content-addressable BLOBs.
  *
- * <p>When requesting BLOBs through the {@link BlobCache#getFile(BlobKey)} method, the
+ * <p>When requesting BLOBs through the {@link BlobCache#getFile} methods, the
  * BLOB cache will first attempt to serve the file from its local cache. Only if
  * the local cache does not contain the desired BLOB, the BLOB cache will try to
  * download it from a distributed file system (if available) or the BLOB
@@ -91,7 +94,7 @@ public final class BlobCache implements BlobService {
 
 		// configure and create the storage directory
 		String storageDirectory = blobClientConfig.getString(BlobServerOptions.STORAGE_DIRECTORY);
-		this.storageDir = BlobUtils.initStorageDirectory(storageDirectory);
+		this.storageDir = BlobUtils.initLocalStorageDirectory(storageDirectory);
 		LOG.info("Created BLOB cache storage directory " + storageDir);
 
 		// configure the number of fetch retries
@@ -110,19 +113,66 @@ public final class BlobCache implements BlobService {
 	}
 
 	/**
-	 * Returns local copy of the file for the BLOB with the given key. The method will first attempt to serve
-	 * the BLOB from its local cache. If the BLOB is not in the cache, the method will try to download it
-	 * from this cache's BLOB server.
+	 * Returns local copy of the (job-unrelated) file for the BLOB with the given key.
+	 * <p>
+	 * The method will first attempt to serve the BLOB from its local cache. If the BLOB is not in
+	 * the cache, the method will try to download it from this cache's BLOB server.
+	 *
+	 * @param key
+	 * 		The key of the desired BLOB.
 	 *
-	 * @param requiredBlob The key of the desired BLOB.
 	 * @return file referring to the local storage location of the BLOB.
-	 * @throws IOException Thrown if an I/O error occurs while downloading the BLOBs from the BLOB server.
+	 *
+	 * @throws IOException
+	 * 		Thrown if an I/O error occurs while downloading the BLOBs from the BLOB server.
 	 */
 	@Override
-	public File getFile(final BlobKey requiredBlob) throws IOException {
+	public File getFile(BlobKey key) throws IOException {
+		return getFileInternal(null, key);
+	}
+
+	/**
+	 * Returns local copy of the file for the BLOB with the given key.
+	 * <p>
+	 * The method will first attempt to serve the BLOB from its local cache. If the BLOB is not in
+	 * the cache, the method will try to download it from this cache's BLOB server.
+	 *
+	 * @param jobId
+	 * 		ID of the job this blob belongs to
+	 * @param key
+	 * 		The key of the desired BLOB.
+	 *
+	 * @return file referring to the local storage location of the BLOB.
+	 *
+	 * @throws IOException
+	 * 		Thrown if an I/O error occurs while downloading the BLOBs from the BLOB server.
+	 */
+	@Override
+	public File getFile(@Nonnull JobID jobId, BlobKey key) throws IOException {
+		checkNotNull(jobId);
+		return getFileInternal(jobId, key);
+	}
+
+	/**
+	 * Returns local copy of the file for the BLOB with the given key.
+	 * <p>
+	 * The method will first attempt to serve the BLOB from its local cache. If the BLOB is not in
+	 * the cache, the method will try to download it from this cache's BLOB server.
+	 *
+	 * @param jobId
+	 * 		ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated)
+	 * @param requiredBlob
+	 * 		The key of the desired BLOB.
+	 *
+	 * @return file referring to the local storage location of the BLOB.
+	 *
+	 * @throws IOException
+	 * 		Thrown if an I/O error occurs while downloading the BLOBs from the BLOB server.
+	 */
+	private File getFileInternal(@Nullable JobID jobId, BlobKey requiredBlob) throws IOException {
 		checkArgument(requiredBlob != null, "BLOB key cannot be null.");
 
-		final File localJarFile = BlobUtils.getStorageLocation(storageDir, requiredBlob);
+		final File localJarFile = BlobUtils.getStorageLocation(storageDir, jobId, requiredBlob);
 
 		if (localJarFile.exists()) {
 			return localJarFile;
@@ -130,7 +180,7 @@ public final class BlobCache implements BlobService {
 
 		// first try the distributed blob store (if available)
 		try {
-			blobView.get(requiredBlob, localJarFile);
+			blobView.get(jobId, requiredBlob, localJarFile);
 		} catch (Exception e) {
 			LOG.info("Failed to copy from blob store. Downloading from BLOB server instead.", e);
 		}
@@ -141,14 +191,14 @@ public final class BlobCache implements BlobService {
 
 		// fallback: download from the BlobServer
 		final byte[] buf = new byte[BlobServerProtocol.BUFFER_SIZE];
-		LOG.info("Downloading {} from {}", requiredBlob, serverAddress);
+		LOG.info("Downloading {}/{} from {}", jobId, requiredBlob, serverAddress);
 
 		// loop over retries
 		int attempt = 0;
 		while (true) {
 			try (
 				final BlobClient bc = new BlobClient(serverAddress, blobClientConfig);
-				final InputStream is = bc.get(requiredBlob);
+				final InputStream is = bc.getInternal(jobId, requiredBlob);
 				final OutputStream os = new FileOutputStream(localJarFile)
 			) {
 				while (true) {
@@ -163,7 +213,7 @@ public final class BlobCache implements BlobService {
 				return localJarFile;
 			}
 			catch (Throwable t) {
-				String message = "Failed to fetch BLOB " + requiredBlob + " from " + serverAddress +
+				String message = "Failed to fetch BLOB " + jobId + "/" + requiredBlob + " from " + serverAddress +
 					" and store it under " + localJarFile.getAbsolutePath();
 				if (attempt < numFetchRetries) {
 					if (LOG.isDebugEnabled()) {
@@ -179,41 +229,110 @@ public final class BlobCache implements BlobService {
 
 				// retry
 				++attempt;
-				LOG.info("Downloading {} from {} (retry {})", requiredBlob, serverAddress, attempt);
+				LOG.info("Downloading {}/{} from {} (retry {})", jobId, requiredBlob, serverAddress, attempt);
 			}
 		} // end loop over retries
 	}
 
 	/**
-	 * Deletes the file associated with the given key from the BLOB cache.
-	 * @param key referring to the file to be deleted
+	 * Deletes the (job-unrelated) file associated with the blob key in this BLOB cache.
+	 *
+	 * @param key
+	 * 		blob key associated with the file to be deleted
+	 *
+	 * @throws IOException
 	 */
 	@Override
-	public void delete(BlobKey key) throws IOException{
-		final File localFile = BlobUtils.getStorageLocation(storageDir, key);
+	public void delete(BlobKey key) throws IOException {
+		deleteInternal(null, key);
+	}
 
+	/**
+	 * Deletes the file associated with the blob key in this BLOB cache.
+	 *
+	 * @param jobId
+	 * 		ID of the job this blob belongs to
+	 * @param key
+	 * 		blob key associated with the file to be deleted
+	 *
+	 * @throws IOException
+	 */
+	@Override
+	public void delete(@Nonnull JobID jobId, BlobKey key) throws IOException {
+		checkNotNull(jobId);
+		deleteInternal(jobId, key);
+	}
+
+	/**
+	 * Deletes the file associated with the blob key in this BLOB cache.
+	 *
+	 * @param jobId
+	 * 		ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated)
+	 * @param key
+	 * 		blob key associated with the file to be deleted
+	 *
+	 * @throws IOException
+	 */
+	private void deleteInternal(@Nullable JobID jobId, BlobKey key) throws IOException{
+		final File localFile = BlobUtils.getStorageLocation(storageDir, jobId, key);
 		if (!localFile.delete() && localFile.exists()) {
 			LOG.warn("Failed to delete locally cached BLOB {} at {}", key, localFile.getAbsolutePath());
 		}
 	}
 
 	/**
-	 * Deletes the file associated with the given key from the BLOB cache and
+	 * Deletes the (job-unrelated) file associated with the given key from the BLOB cache and
 	 * BLOB server.
 	 *
-	 * @param key referring to the file to be deleted
+	 * @param key
+	 * 		referring to the file to be deleted
+	 *
 	 * @throws IOException
-	 *         thrown if an I/O error occurs while transferring the request to
-	 *         the BLOB server or if the BLOB server cannot delete the file
+	 * 		thrown if an I/O error occurs while transferring the request to the BLOB server or if the
+	 * 		BLOB server cannot delete the file
 	 */
 	public void deleteGlobal(BlobKey key) throws IOException {
+		deleteGlobalInternal(null, key);
+	}
+
+	/**
+	 * Deletes the file associated with the given key from the BLOB cache and BLOB server.
+	 *
+	 * @param jobId
+	 * 		ID of the job this blob belongs to
+	 * @param key
+	 * 		referring to the file to be deleted
+	 *
+	 * @throws IOException
+	 * 		thrown if an I/O error occurs while transferring the request to the BLOB server or if the
+	 * 		BLOB server cannot delete the file
+	 */
+	public void deleteGlobal(@Nonnull JobID jobId, BlobKey key) throws IOException {
+		checkNotNull(jobId);
+		deleteGlobalInternal(jobId, key);
+	}
+
+	/**
+	 * Deletes the file associated with the given key from the BLOB cache and
+	 * BLOB server.
+	 *
+	 * @param jobId
+	 * 		ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated)
+	 * @param key
+	 * 		referring to the file to be deleted
+	 *
+	 * @throws IOException
+	 * 		thrown if an I/O error occurs while transferring the request to the BLOB server or if the
+	 * 		BLOB server cannot delete the file
+	 */
+	private void deleteGlobalInternal(@Nullable JobID jobId, BlobKey key) throws IOException {
 		// delete locally
-		delete(key);
+		deleteInternal(jobId, key);
 		// then delete on the BLOB server
 		// (don't use the distributed storage directly - this way the blob
 		// server is aware of the delete operation, too)
 		try (BlobClient bc = createClient()) {
-			bc.delete(key);
+			bc.deleteInternal(jobId, key);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0a19c456/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
index 0882ec3..9a2f59e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.blob;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
@@ -29,6 +30,8 @@ import org.apache.flink.util.InstantiationUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLParameters;
 import javax.net.ssl.SSLSocket;
@@ -46,7 +49,8 @@ import java.util.Collections;
 import java.util.List;
 
 import static org.apache.flink.runtime.blob.BlobServerProtocol.BUFFER_SIZE;
-import static org.apache.flink.runtime.blob.BlobServerProtocol.CONTENT_ADDRESSABLE;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.CONTENT_FOR_JOB;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.CONTENT_NO_JOB;
 import static org.apache.flink.runtime.blob.BlobServerProtocol.DELETE_OPERATION;
 import static org.apache.flink.runtime.blob.BlobServerProtocol.GET_OPERATION;
 import static org.apache.flink.runtime.blob.BlobServerProtocol.PUT_OPERATION;
@@ -55,7 +59,7 @@ import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_OKAY;
 import static org.apache.flink.runtime.blob.BlobUtils.readFully;
 import static org.apache.flink.runtime.blob.BlobUtils.readLength;
 import static org.apache.flink.runtime.blob.BlobUtils.writeLength;
-import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * The BLOB client can communicate with the BLOB server and either upload (PUT), download (GET),
@@ -75,6 +79,7 @@ public final class BlobClient implements Closeable {
 	 *        the network address of the BLOB server
 	 * @param clientConfig
 	 *        additional configuration like SSL parameters required to connect to the blob server
+	 *
 	 * @throws IOException
 	 *         thrown if the connection to the BLOB server could not be established
 	 */
@@ -130,22 +135,65 @@ public final class BlobClient implements Closeable {
 	// --------------------------------------------------------------------------------------------
 
 	/**
-	 * Downloads the BLOB identified by the given BLOB key from the BLOB server. If no such BLOB exists on the server, a
-	 * {@link FileNotFoundException} is thrown.
-	 * 
+	 * Downloads the (job-unrelated) BLOB identified by the given BLOB key from the BLOB server.
+	 *
 	 * @param blobKey
-	 *        the BLOB key identifying the BLOB to download
+	 * 		blob key associated with the requested file
+	 *
 	 * @return an input stream to read the retrieved data from
+	 *
+	 * @throws FileNotFoundException
+	 * 		if there is no such file;
 	 * @throws IOException
-	 *         thrown if an I/O error occurs during the download
+	 * 		if an I/O error occurs during the download
 	 */
 	public InputStream get(BlobKey blobKey) throws IOException {
+		return getInternal(null, blobKey);
+	}
+
+	/**
+	 * Downloads the BLOB identified by the given BLOB key from the BLOB server.
+	 *
+	 * @param jobId
+	 * 		ID of the job this blob belongs to
+	 * @param blobKey
+	 * 		blob key associated with the requested file
+	 *
+	 * @return an input stream to read the retrieved data from
+	 *
+	 * @throws FileNotFoundException
+	 * 		if there is no such file;
+	 * @throws IOException
+	 * 		if an I/O error occurs during the download
+	 */
+	public InputStream get(@Nonnull JobID jobId, BlobKey blobKey) throws IOException {
+		checkNotNull(jobId);
+		return getInternal(jobId, blobKey);
+	}
+
+	/**
+	 * Downloads the BLOB identified by the given BLOB key from the BLOB server.
+	 *
+	 * @param jobId
+	 * 		ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated)
+	 * @param blobKey
+	 * 		blob key associated with the requested file
+	 *
+	 * @return an input stream to read the retrieved data from
+	 *
+	 * @throws FileNotFoundException
+	 * 		if there is no such file;
+	 * @throws IOException
+	 * 		if an I/O error occurs during the download
+	 */
+	InputStream getInternal(@Nullable JobID jobId, BlobKey blobKey) throws IOException {
 		if (this.socket.isClosed()) {
 			throw new IllegalStateException("BLOB Client is not connected. " +
 					"Client has been shut down or encountered an error before.");
 		}
 		if (LOG.isDebugEnabled()) {
-			LOG.debug(String.format("GET content addressable BLOB %s from %s", blobKey, socket.getLocalSocketAddress()));
+			LOG.debug("GET BLOB {}/{} from {}.", jobId, blobKey,
+				socket.getLocalSocketAddress());
 		}
 
 		try {
@@ -153,8 +201,8 @@ public final class BlobClient implements Closeable {
 			InputStream is = this.socket.getInputStream();
 
 			// Send GET header
-			sendGetHeader(os, null, blobKey);
-			receiveAndCheckResponse(is);
+			sendGetHeader(os, jobId, blobKey);
+			receiveAndCheckGetResponse(is);
 
 			return new BlobInputStream(is, blobKey);
 		}
@@ -169,29 +217,40 @@ public final class BlobClient implements Closeable {
 	 *
 	 * @param outputStream
 	 *        the output stream to write the header data to
-	 * @param jobID
-	 *        the job ID identifying the BLOB to download or <code>null</code> to indicate the BLOB key should be used
-	 *        to identify the BLOB on the server instead
+	 * @param jobId
+	 * 		ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated)
 	 * @param blobKey
-	 *        the BLOB key to identify the BLOB to download if either the job ID or the regular key are
-	 *        <code>null</code>
+	 * 		blob key associated with the requested file
+	 *
 	 * @throws IOException
 	 *         thrown if an I/O error occurs while writing the header data to the output stream
 	 */
-	private void sendGetHeader(OutputStream outputStream, JobID jobID, BlobKey blobKey) throws IOException {
-		checkArgument(jobID == null);
+	private static void sendGetHeader(OutputStream outputStream, @Nullable JobID jobId, BlobKey blobKey) throws IOException {
+		checkNotNull(blobKey);
 
 		// Signal type of operation
 		outputStream.write(GET_OPERATION);
 
-		// Check if GET should be done in content-addressable manner
-		if (jobID == null) {
-			outputStream.write(CONTENT_ADDRESSABLE);
-			blobKey.writeToOutputStream(outputStream);
+		// Send job ID and key
+		if (jobId == null) {
+			outputStream.write(CONTENT_NO_JOB);
+		} else {
+			outputStream.write(CONTENT_FOR_JOB);
+			outputStream.write(jobId.getBytes());
 		}
+		blobKey.writeToOutputStream(outputStream);
 	}
 
-	private void receiveAndCheckResponse(InputStream is) throws IOException {
+	/**
+	 * Reads the response from the input stream and throws in case of errors
+	 *
+	 * @param is
+	 * 		stream to read from
+	 *
+	 * @throws IOException
+	 * 		if the response is an error or reading the response failed
+	 */
+	private static void receiveAndCheckGetResponse(InputStream is) throws IOException {
 		int response = is.read();
 		if (response < 0) {
 			throw new EOFException("Premature end of response");
@@ -211,82 +270,111 @@ public final class BlobClient implements Closeable {
 	// --------------------------------------------------------------------------------------------
 
 	/**
-	 * Uploads the data of the given byte array to the BLOB server in a content-addressable manner.
+	 * Uploads the data of the given byte array for the given job to the BLOB server.
 	 *
+	 * @param jobId
+	 * 		the ID of the job the BLOB belongs to (or <tt>null</tt> if job-unrelated)
 	 * @param value
-	 *        the buffer to upload
+	 * 		the buffer to upload
+	 *
 	 * @return the computed BLOB key identifying the BLOB on the server
+	 *
 	 * @throws IOException
-	 *         thrown if an I/O error occurs while uploading the data to the BLOB server
+	 * 		thrown if an I/O error occurs while uploading the data to the BLOB server
 	 */
-	public BlobKey put(byte[] value) throws IOException {
-		return put(value, 0, value.length);
+	@VisibleForTesting
+	public BlobKey put(@Nullable JobID jobId, byte[] value) throws IOException {
+		return put(jobId, value, 0, value.length);
 	}
 
 	/**
-	 * Uploads data from the given byte array to the BLOB server in a content-addressable manner.
+	 * Uploads data from the given byte array for the given job to the BLOB server.
 	 *
+	 * @param jobId
+	 * 		the ID of the job the BLOB belongs to (or <tt>null</tt> if job-unrelated)
 	 * @param value
-	 *        the buffer to upload data from
+	 * 		the buffer to upload data from
 	 * @param offset
-	 *        the read offset within the buffer
+	 * 		the read offset within the buffer
 	 * @param len
-	 *        the number of bytes to upload from the buffer
+	 * 		the number of bytes to upload from the buffer
+	 *
 	 * @return the computed BLOB key identifying the BLOB on the server
+	 *
 	 * @throws IOException
-	 *         thrown if an I/O error occurs while uploading the data to the BLOB server
+	 * 		thrown if an I/O error occurs while uploading the data to the BLOB server
 	 */
-	public BlobKey put(byte[] value, int offset, int len) throws IOException {
-		return putBuffer(null, value, offset, len);
+	@VisibleForTesting
+	public BlobKey put(@Nullable JobID jobId, byte[] value, int offset, int len) throws IOException {
+		return putBuffer(jobId, value, offset, len);
 	}
 
 	/**
-	 * Uploads the data from the given input stream to the BLOB server in a content-addressable manner.
+	 * Uploads the (job-unrelated) data from the given input stream to the BLOB server.
 	 *
 	 * @param inputStream
-	 *        the input stream to read the data from
+	 * 		the input stream to read the data from
+	 *
 	 * @return the computed BLOB key identifying the BLOB on the server
+	 *
 	 * @throws IOException
-	 *         thrown if an I/O error occurs while reading the data from the input stream or uploading the data to the
-	 *         BLOB server
+	 * 		thrown if an I/O error occurs while reading the data from the input stream or uploading the
+	 * 		data to the BLOB server
 	 */
 	public BlobKey put(InputStream inputStream) throws IOException {
 		return putInputStream(null, inputStream);
 	}
 
 	/**
+	 * Uploads the data from the given input stream for the given job to the BLOB server.
+	 *
+	 * @param jobId
+	 * 		ID of the job this blob belongs to
+	 * @param inputStream
+	 * 		the input stream to read the data from
+	 *
+	 * @return the computed BLOB key identifying the BLOB on the server
+	 *
+	 * @throws IOException
+	 * 		thrown if an I/O error occurs while reading the data from the input stream or uploading the
+	 * 		data to the BLOB server
+	 */
+	public BlobKey put(@Nonnull JobID jobId, InputStream inputStream) throws IOException {
+		checkNotNull(jobId);
+		return putInputStream(jobId, inputStream);
+	}
+
+	/**
 	 * Uploads data from the given byte buffer to the BLOB server.
 	 *
 	 * @param jobId
-	 *        the ID of the job the BLOB belongs to or <code>null</code> to store the BLOB in a content-addressable
-	 *        manner
+	 * 		the ID of the job the BLOB belongs to (or <tt>null</tt> if job-unrelated)
 	 * @param value
-	 *        the buffer to read the data from
+	 * 		the buffer to read the data from
 	 * @param offset
-	 *        the read offset within the buffer
+	 * 		the read offset within the buffer
 	 * @param len
-	 *        the number of bytes to read from the buffer
-	 * @return the computed BLOB key if the BLOB has been stored in a content-addressable manner, <code>null</code>
-	 *         otherwise
+	 * 		the number of bytes to read from the buffer
+	 *
+	 * @return the computed BLOB key of the uploaded BLOB
+	 *
 	 * @throws IOException
-	 *         thrown if an I/O error occurs while uploading the data to the BLOB server
+	 * 		thrown if an I/O error occurs while uploading the data to the BLOB server
 	 */
-	private BlobKey putBuffer(JobID jobId, byte[] value, int offset, int len) throws IOException {
+	private BlobKey putBuffer(@Nullable JobID jobId, byte[] value, int offset, int len) throws IOException {
 		if (this.socket.isClosed()) {
 			throw new IllegalStateException("BLOB Client is not connected. " +
 					"Client has been shut down or encountered an error before.");
 		}
+		checkNotNull(value);
 
 		if (LOG.isDebugEnabled()) {
-			if (jobId == null) {
-				LOG.debug(String.format("PUT content addressable BLOB buffer (%d bytes) to %s",
-						len, socket.getLocalSocketAddress()));
-			}
+			LOG.debug("PUT BLOB buffer ({} bytes) to {}.", len, socket.getLocalSocketAddress());
 		}
 
 		try {
 			final OutputStream os = this.socket.getOutputStream();
-			final MessageDigest md = jobId == null ? BlobUtils.createMessageDigest() : null;
+			final MessageDigest md = BlobUtils.createMessageDigest();
 
 			// Send the PUT header
 			sendPutHeader(os, jobId);
@@ -295,15 +383,15 @@ public final class BlobClient implements Closeable {
 			int remainingBytes = len;
 
 			while (remainingBytes > 0) {
+				// want a common code path for byte[] and InputStream at the BlobServer
+				// -> since for InputStream we don't know a total size beforehand, send lengths iteratively
 				final int bytesToSend = Math.min(BUFFER_SIZE, remainingBytes);
 				writeLength(bytesToSend, os);
 
 				os.write(value, offset, bytesToSend);
 
-				// Update the message digest if necessary
-				if (md != null) {
-					md.update(value, offset, bytesToSend);
-				}
+				// Update the message digest
+				md.update(value, offset, bytesToSend);
 
 				remainingBytes -= bytesToSend;
 				offset += bytesToSend;
@@ -313,7 +401,7 @@ public final class BlobClient implements Closeable {
 
 			// Receive blob key and compare
 			final InputStream is = this.socket.getInputStream();
-			return receivePutResponseAndCompare(is, md);
+			return receiveAndCheckPutResponse(is, md);
 		}
 		catch (Throwable t) {
 			BlobUtils.closeSilently(socket, LOG);
@@ -325,37 +413,36 @@ public final class BlobClient implements Closeable {
 	 * Uploads data from the given input stream to the BLOB server.
 	 *
 	 * @param jobId
-	 *        the ID of the job the BLOB belongs to or <code>null</code> to store the BLOB in a content-addressable
-	 *        manner
+	 * 		the ID of the job the BLOB belongs to (or <tt>null</tt> if job-unrelated)
 	 * @param inputStream
-	 *        the input stream to read the data from
-	 * @return he computed BLOB key if the BLOB has been stored in a content-addressable manner, <code>null</code>
-	 *         otherwise
+	 * 		the input stream to read the data from
+	 *
+	 * @return the computed BLOB key of the uploaded BLOB
+	 *
 	 * @throws IOException
-	 *         thrown if an I/O error occurs while uploading the data to the BLOB server
+	 * 		thrown if an I/O error occurs while uploading the data to the BLOB server
 	 */
-	private BlobKey putInputStream(JobID jobId, InputStream inputStream) throws IOException {
+	private BlobKey putInputStream(@Nullable JobID jobId, InputStream inputStream) throws IOException {
 		if (this.socket.isClosed()) {
 			throw new IllegalStateException("BLOB Client is not connected. " +
 					"Client has been shut down or encountered an error before.");
 		}
+		checkNotNull(inputStream);
 
 		if (LOG.isDebugEnabled()) {
-			if (jobId == null) {
-				LOG.debug(String.format("PUT content addressable BLOB stream to %s",
-						socket.getLocalSocketAddress()));
-			}
+			LOG.debug("PUT BLOB stream to {}.", socket.getLocalSocketAddress());
 		}
 
 		try {
 			final OutputStream os = this.socket.getOutputStream();
-			final MessageDigest md = jobId == null ? BlobUtils.createMessageDigest() : null;
+			final MessageDigest md = BlobUtils.createMessageDigest();
 			final byte[] xferBuf = new byte[BUFFER_SIZE];
 
 			// Send the PUT header
 			sendPutHeader(os, jobId);
 
 			while (true) {
+				// since we don't know a total size here, send lengths iteratively
 				final int read = inputStream.read(xferBuf);
 				if (read < 0) {
 					// we are done. send a -1 and be done
@@ -365,15 +452,13 @@ public final class BlobClient implements Closeable {
 				if (read > 0) {
 					writeLength(read, os);
 					os.write(xferBuf, 0, read);
-					if (md != null) {
-						md.update(xferBuf, 0, read);
-					}
+					md.update(xferBuf, 0, read);
 				}
 			}
 
 			// Receive blob key and compare
 			final InputStream is = this.socket.getInputStream();
-			return receivePutResponseAndCompare(is, md);
+			return receiveAndCheckPutResponse(is, md);
 		}
 		catch (Throwable t) {
 			BlobUtils.closeSilently(socket, LOG);
@@ -381,16 +466,25 @@ public final class BlobClient implements Closeable {
 		}
 	}
 
-	private BlobKey receivePutResponseAndCompare(InputStream is, MessageDigest md) throws IOException {
+	/**
+	 * Reads the response from the input stream and throws in case of errors
+	 *
+	 * @param is
+	 * 		stream to read from
+	 * @param md
+	 * 		message digest to check the response against
+	 *
+	 * @throws IOException
+	 * 		if the response is an error, the message digest does not match or reading the response
+	 * 		failed
+	 */
+	private static BlobKey receiveAndCheckPutResponse(InputStream is, MessageDigest md)
+			throws IOException {
 		int response = is.read();
 		if (response < 0) {
 			throw new EOFException("Premature end of response");
 		}
 		else if (response == RETURN_OKAY) {
-			if (md == null) {
-				// not content addressable
-				return null;
-			}
 
 			BlobKey remoteKey = BlobKey.readFromInputStream(is);
 			BlobKey localKey = new BlobKey(md.digest());
@@ -412,24 +506,24 @@ public final class BlobClient implements Closeable {
 
 	/**
 	 * Constructs and writes the header data for a PUT request to the given output stream.
-	 * NOTE: If the jobId and key are null, we send the data to the content addressable section.
 	 *
 	 * @param outputStream
-	 *        the output stream to write the PUT header data to
-	 * @param jobID
-	 *        the ID of job the BLOB belongs to or <code>null</code> to indicate the upload of a
-	 *        content-addressable BLOB
+	 * 		the output stream to write the PUT header data to
+	 * @param jobId
+	 * 		the ID of job the BLOB belongs to (or <tt>null</tt> if job-unrelated)
+	 *
 	 * @throws IOException
-	 *         thrown if an I/O error occurs while writing the header data to the output stream
+	 * 		thrown if an I/O error occurs while writing the header data to the output stream
 	 */
-	private void sendPutHeader(OutputStream outputStream, JobID jobID) throws IOException {
-		checkArgument(jobID == null);
-
+	private static void sendPutHeader(OutputStream outputStream, @Nullable JobID jobId) throws IOException {
 		// Signal type of operation
 		outputStream.write(PUT_OPERATION);
-
-		// Check if PUT should be done in content-addressable manner
-		outputStream.write(CONTENT_ADDRESSABLE);
+		if (jobId == null) {
+			outputStream.write(CONTENT_NO_JOB);
+		} else {
+			outputStream.write(CONTENT_FOR_JOB);
+			outputStream.write(jobId.getBytes());
+		}
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -437,16 +531,50 @@ public final class BlobClient implements Closeable {
 	// --------------------------------------------------------------------------------------------
 
 	/**
-	 * Deletes the BLOB identified by the given BLOB key from the BLOB server.
+	 * Deletes the (job-unrelated) BLOB identified by the given BLOB key from the BLOB server.
+	 *
+	 * @param key
+	 * 		the key to identify the BLOB
 	 *
-	 * @param blobKey
-	 *        the key to identify the BLOB
 	 * @throws IOException
-	 *         thrown if an I/O error occurs while transferring the request to
-	 *         the BLOB server or if the BLOB server cannot delete the file
+	 * 		thrown if an I/O error occurs while transferring the request to the BLOB server or if the
+	 * 		BLOB server cannot delete the file
 	 */
-	public void delete(BlobKey blobKey) throws IOException {
-		checkArgument(blobKey != null, "BLOB key must not be null.");
+	public void delete(BlobKey key) throws IOException {
+		deleteInternal(null, key);
+	}
+
+	/**
+	 * Deletes the BLOB identified by the given BLOB key and job ID from the BLOB server.
+	 *
+	 * @param jobId
+	 * 		the ID of job the BLOB belongs to
+	 * @param key
+	 * 		the key to identify the BLOB
+	 *
+	 * @throws IOException
+	 * 		thrown if an I/O error occurs while transferring the request to the BLOB server or if the
+	 * 		BLOB server cannot delete the file
+	 */
+	public void delete(@Nonnull JobID jobId, BlobKey key) throws IOException {
+		checkNotNull(jobId);
+		deleteInternal(jobId, key);
+	}
+
+	/**
+	 * Deletes the BLOB identified by the given BLOB key and job ID from the BLOB server.
+	 *
+	 * @param jobId
+	 * 		the ID of job the BLOB belongs to (or <tt>null</tt> if job-unrelated)
+	 * @param key
+	 * 		the key to identify the BLOB
+	 *
+	 * @throws IOException
+	 * 		thrown if an I/O error occurs while transferring the request to the BLOB server or if the
+	 * 		BLOB server cannot delete the file
+	 */
+	public void deleteInternal(@Nullable JobID jobId, BlobKey key) throws IOException {
+		checkNotNull(key);
 
 		try {
 			final OutputStream outputStream = this.socket.getOutputStream();
@@ -456,20 +584,16 @@ public final class BlobClient implements Closeable {
 			outputStream.write(DELETE_OPERATION);
 
 			// delete blob key
-			outputStream.write(CONTENT_ADDRESSABLE);
-			blobKey.writeToOutputStream(outputStream);
-
-			int response = inputStream.read();
-			if (response < 0) {
-				throw new EOFException("Premature end of response");
-			}
-			if (response == RETURN_ERROR) {
-				Throwable cause = readExceptionFromStream(inputStream);
-				throw new IOException("Server side error: " + cause.getMessage(), cause);
-			}
-			else if (response != RETURN_OKAY) {
-				throw new IOException("Unrecognized response");
+			if (jobId == null) {
+				outputStream.write(CONTENT_NO_JOB);
+			} else {
+				outputStream.write(CONTENT_FOR_JOB);
+				outputStream.write(jobId.getBytes());
 			}
+			key.writeToOutputStream(outputStream);
+
+			// the response is the same as for a GET request
+			receiveAndCheckGetResponse(inputStream);
 		}
 		catch (Throwable t) {
 			BlobUtils.closeSilently(socket, LOG);
@@ -479,11 +603,18 @@ public final class BlobClient implements Closeable {
 
 	/**
 	 * Uploads the JAR files to a {@link BlobServer} at the given address.
+	 * <p>
+	 * TODO: add jobId to signature after adapting the BlobLibraryCacheManager
+	 *
+	 * @param serverAddress
+	 * 		Server address of the {@link BlobServer}
+	 * @param clientConfig
+	 * 		Any additional configuration for the blob client
+	 * @param jars
+	 * 		List of JAR files to upload
 	 *
-	 * @param serverAddress Server address of the {@link BlobServer}
-	 * @param clientConfig Any additional configuration for the blob client
-	 * @param jars List of JAR files to upload
-	 * @throws IOException Thrown if the upload fails
+	 * @throws IOException
+	 * 		if the upload fails
 	 */
 	public static List<BlobKey> uploadJarFiles(
 			InetSocketAddress serverAddress,
@@ -500,7 +631,7 @@ public final class BlobClient implements Closeable {
 					FSDataInputStream is = null;
 					try {
 						is = fs.open(jar);
-						final BlobKey key = blobClient.put(is);
+						final BlobKey key = blobClient.putInputStream(null, is);
 						blobKeys.add(key);
 					} finally {
 						if (is != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/0a19c456/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index add9f7f..43a060a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.blob;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
@@ -28,6 +29,8 @@ import org.apache.flink.util.NetUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 import javax.net.ssl.SSLContext;
 import java.io.File;
 import java.io.FileNotFoundException;
@@ -40,7 +43,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -58,7 +61,7 @@ public class BlobServer extends Thread implements BlobService {
 	private static final Logger LOG = LoggerFactory.getLogger(BlobServer.class);
 
 	/** Counter to generate unique names for temporary files. */
-	private final AtomicInteger tempFileCounter = new AtomicInteger(0);
+	private final AtomicLong tempFileCounter = new AtomicLong(0);
 
 	/** The server socket listening for incoming connections. */
 	private final ServerSocket serverSocket;
@@ -110,7 +113,7 @@ public class BlobServer extends Thread implements BlobService {
 
 		// configure and create the storage directory
 		String storageDirectory = config.getString(BlobServerOptions.STORAGE_DIRECTORY);
-		this.storageDir = BlobUtils.initStorageDirectory(storageDirectory);
+		this.storageDir = BlobUtils.initLocalStorageDirectory(storageDirectory);
 		LOG.info("Created BLOB server storage directory {}", storageDir);
 
 		// configure the maximum number of concurrent connections
@@ -189,11 +192,12 @@ public class BlobServer extends Thread implements BlobService {
 	 *
 	 * <p><strong>This is only called from the {@link BlobServerConnection}</strong>
 	 *
+	 * @param jobId ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated)
 	 * @param key identifying the file
 	 * @return file handle to the file
 	 */
-	File getStorageLocation(BlobKey key) {
-		return BlobUtils.getStorageLocation(storageDir, key);
+	File getStorageLocation(JobID jobId, BlobKey key) {
+		return BlobUtils.getStorageLocation(storageDir, jobId, key);
 	}
 
 	/**
@@ -333,20 +337,69 @@ public class BlobServer extends Thread implements BlobService {
 	}
 
 	/**
-	 * Method which retrieves the local path of a file associated with a blob key. The blob server
-	 * looks the blob key up in its local storage. If the file exists, it is returned. If the
-	 * file does not exist, it is retrieved from the HA blob store (if available) or a
-	 * FileNotFoundException is thrown.
+	 * Retrieves the local path of a (job-unrelated) file associated with a job and a blob key.
+	 * <p>
+	 * The blob server looks the blob key up in its local storage. If the file exists, it is
+	 * returned. If the file does not exist, it is retrieved from the HA blob store (if available)
+	 * or a {@link FileNotFoundException} is thrown.
 	 *
-	 * @param requiredBlob blob key associated with the requested file
-	 * @return file referring to the local storage location of the BLOB.
-	 * @throws IOException Thrown if the file retrieval failed.
+	 * @param key
+	 * 		blob key associated with the requested file
+	 *
+	 * @return file referring to the local storage location of the BLOB
+	 *
+	 * @throws IOException
+	 * 		Thrown if the file retrieval failed.
+	 */
+	@Override
+	public File getFile(BlobKey key) throws IOException {
+		return getFileInternal(null, key);
+	}
+
+	/**
+	 * Retrieves the local path of a file associated with a job and a blob key.
+	 * <p>
+	 * The blob server looks the blob key up in its local storage. If the file exists, it is
+	 * returned. If the file does not exist, it is retrieved from the HA blob store (if available)
+	 * or a {@link FileNotFoundException} is thrown.
+	 *
+	 * @param jobId
+	 * 		ID of the job this blob belongs to
+	 * @param key
+	 * 		blob key associated with the requested file
+	 *
+	 * @return file referring to the local storage location of the BLOB
+	 *
+	 * @throws IOException
+	 * 		Thrown if the file retrieval failed.
 	 */
 	@Override
-	public File getFile(BlobKey requiredBlob) throws IOException {
+	public File getFile(@Nonnull JobID jobId, BlobKey key) throws IOException {
+		checkNotNull(jobId);
+		return getFileInternal(jobId, key);
+	}
+
+	/**
+	 * Retrieves the local path of a file associated with a job and a blob key.
+	 * <p>
+	 * The blob server looks the blob key up in its local storage. If the file exists, it is
+	 * returned. If the file does not exist, it is retrieved from the HA blob store (if available)
+	 * or a {@link FileNotFoundException} is thrown.
+	 *
+	 * @param jobId
+	 * 		ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated)
+	 * @param requiredBlob
+	 * 		blob key associated with the requested file
+	 *
+	 * @return file referring to the local storage location of the BLOB
+	 *
+	 * @throws IOException
+	 * 		Thrown if the file retrieval failed.
+	 */
+	private File getFileInternal(@Nullable JobID jobId, BlobKey requiredBlob) throws IOException {
 		checkArgument(requiredBlob != null, "BLOB key cannot be null.");
 
-		final File localFile = BlobUtils.getStorageLocation(storageDir, requiredBlob);
+		final File localFile = BlobUtils.getStorageLocation(storageDir, jobId, requiredBlob);
 
 		if (localFile.exists()) {
 			return localFile;
@@ -354,10 +407,11 @@ public class BlobServer extends Thread implements BlobService {
 		else {
 			try {
 				// Try the blob store
-				blobStore.get(requiredBlob, localFile);
+				blobStore.get(jobId, requiredBlob, localFile);
 			}
 			catch (Exception e) {
-				throw new IOException("Failed to copy from blob store.", e);
+				throw new IOException(
+					"Failed to copy BLOB " + requiredBlob + " from blob store to " + localFile, e);
 			}
 
 			if (localFile.exists()) {
@@ -371,24 +425,58 @@ public class BlobServer extends Thread implements BlobService {
 	}
 
 	/**
-	 * This method deletes the file associated to the blob key if it exists in the local storage
-	 * of the blob server.
+	 * Deletes the (job-unrelated) file associated with the blob key in both the local storage as
+	 * well as in the HA store of the blob server.
+	 *
+	 * @param key
+	 * 		blob key associated with the file to be deleted
 	 *
-	 * @param key associated with the file to be deleted
 	 * @throws IOException
 	 */
 	@Override
 	public void delete(BlobKey key) throws IOException {
-		final File localFile = BlobUtils.getStorageLocation(storageDir, key);
+		deleteInternal(null, key);
+	}
+
+	/**
+	 * Deletes the file associated with the blob key in both the local storage as well as in the HA
+	 * store of the blob server.
+	 *
+	 * @param jobId
+	 * 		ID of the job this blob belongs to
+	 * @param key
+	 * 		blob key associated with the file to be deleted
+	 *
+	 * @throws IOException
+	 */
+	@Override
+	public void delete(@Nonnull JobID jobId, BlobKey key) throws IOException {
+		checkNotNull(jobId);
+		deleteInternal(jobId, key);
+	}
+
+	/**
+	 * Deletes the file associated with the blob key in both the local storage as well as in the HA
+	 * store of the blob server.
+	 *
+	 * @param jobId
+	 * 		ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated)
+	 * @param key
+	 * 		blob key associated with the file to be deleted
+	 *
+	 * @throws IOException
+	 */
+	void deleteInternal(@Nullable JobID jobId, BlobKey key) throws IOException {
+		final File localFile = BlobUtils.getStorageLocation(storageDir, jobId, key);
 
 		readWriteLock.writeLock().lock();
 
 		try {
 			if (!localFile.delete() && localFile.exists()) {
-				LOG.warn("Failed to delete locally BLOB " + key + " at " + localFile.getAbsolutePath());
+				LOG.warn("Failed to locally delete BLOB " + key + " at " + localFile.getAbsolutePath());
 			}
 
-			blobStore.delete(key);
+			blobStore.delete(jobId, key);
 		} finally {
 			readWriteLock.writeLock().unlock();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/0a19c456/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
index 181211d..f1054c0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
@@ -39,7 +39,8 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 
 import static org.apache.flink.runtime.blob.BlobServerProtocol.BUFFER_SIZE;
-import static org.apache.flink.runtime.blob.BlobServerProtocol.CONTENT_ADDRESSABLE;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.CONTENT_FOR_JOB;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.CONTENT_NO_JOB;
 import static org.apache.flink.runtime.blob.BlobServerProtocol.DELETE_OPERATION;
 import static org.apache.flink.runtime.blob.BlobServerProtocol.GET_OPERATION;
 import static org.apache.flink.runtime.blob.BlobServerProtocol.PUT_OPERATION;
@@ -49,6 +50,7 @@ import static org.apache.flink.runtime.blob.BlobUtils.closeSilently;
 import static org.apache.flink.runtime.blob.BlobUtils.readFully;
 import static org.apache.flink.runtime.blob.BlobUtils.readLength;
 import static org.apache.flink.runtime.blob.BlobUtils.writeLength;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * A BLOB connection handles a series of requests from a particular BLOB client.
@@ -83,12 +85,8 @@ class BlobServerConnection extends Thread {
 		super("BLOB connection for " + clientSocket.getRemoteSocketAddress());
 		setDaemon(true);
 
-		if (blobServer == null) {
-			throw new NullPointerException();
-		}
-
 		this.clientSocket = clientSocket;
-		this.blobServer = blobServer;
+		this.blobServer = checkNotNull(blobServer);
 		this.blobStore = blobServer.getBlobStore();
 
 		ReadWriteLock readWriteLock = blobServer.getReadWriteLock();
@@ -167,15 +165,16 @@ class BlobServerConnection extends Thread {
 
 	/**
 	 * Handles an incoming GET request from a BLOB client.
-	 * 
+	 *
 	 * @param inputStream
-	 *        the input stream to read incoming data from
+	 * 		the input stream to read incoming data from
 	 * @param outputStream
-	 *        the output stream to send data back to the client
+	 * 		the output stream to send data back to the client
 	 * @param buf
-	 *        an auxiliary buffer for data serialization/deserialization
+	 * 		an auxiliary buffer for data serialization/deserialization
+	 *
 	 * @throws IOException
-	 *         thrown if an I/O error occurs while reading/writing data from/to the respective streams
+	 * 		thrown if an I/O error occurs while reading/writing data from/to the respective streams
 	 */
 	private void get(InputStream inputStream, OutputStream outputStream, byte[] buf) throws IOException {
 		/*
@@ -187,25 +186,36 @@ class BlobServerConnection extends Thread {
 		 * so a local cache makes more sense.
 		 */
 
-		File blobFile;
-		int contentAddressable = -1;
-		JobID jobId = null;
-		BlobKey blobKey = null;
+		final File blobFile;
+		final JobID jobId;
+		final BlobKey blobKey;
 
 		try {
-			contentAddressable = inputStream.read();
+			final int mode = inputStream.read();
 
-			if (contentAddressable < 0) {
+			if (mode < 0) {
 				throw new EOFException("Premature end of GET request");
 			}
-			if (contentAddressable == CONTENT_ADDRESSABLE) {
-				blobKey = BlobKey.readFromInputStream(inputStream);
-				blobFile = blobServer.getStorageLocation(blobKey);
+
+			// Receive the job ID and key
+			if (mode == CONTENT_NO_JOB) {
+				jobId = null;
+			} else if (mode == CONTENT_FOR_JOB) {
+				byte[] jidBytes = new byte[JobID.SIZE];
+				readFully(inputStream, jidBytes, 0, JobID.SIZE, "JobID");
+				jobId = JobID.fromByteArray(jidBytes);
+			} else {
+				throw new IOException("Unknown type of BLOB addressing: " + mode + '.');
 			}
-			else {
-				throw new IOException("Unknown type of BLOB addressing: " + contentAddressable + '.');
+			blobKey = BlobKey.readFromInputStream(inputStream);
+
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Received GET request for BLOB {}/{} from {}.", jobId,
+					blobKey, clientSocket.getInetAddress());
 			}
 
+			blobFile = blobServer.getStorageLocation(jobId, blobKey);
+
 			// up to here, an error can give a good message
 		}
 		catch (Throwable t) {
@@ -214,7 +224,7 @@ class BlobServerConnection extends Thread {
 				writeErrorToStream(outputStream, t);
 			}
 			catch (IOException e) {
-				// since we are in an exception case, it means not much that we could not send the error
+				// since we are in an exception case, it means that we could not send the error
 				// ignore this
 			}
 			clientSocket.close();
@@ -224,6 +234,7 @@ class BlobServerConnection extends Thread {
 		readLock.lock();
 
 		try {
+			// copy the file to local store if it does not exist yet
 			try {
 				if (!blobFile.exists()) {
 					// first we have to release the read lock in order to acquire the write lock
@@ -232,9 +243,9 @@ class BlobServerConnection extends Thread {
 
 					try {
 						if (blobFile.exists()) {
-							LOG.debug("Blob file {} has downloaded from the BlobStore by a different connection.", blobFile);
+							LOG.debug("Blob file {} has been downloaded from the (distributed) blob store by a different connection.", blobFile);
 						} else {
-							blobStore.get(blobKey, blobFile);
+							blobStore.get(jobId, blobKey, blobFile);
 						}
 					} finally {
 						writeLock.unlock();
@@ -248,6 +259,7 @@ class BlobServerConnection extends Thread {
 					}
 				}
 
+				// enforce a 2GB max for now (otherwise the protocol's length field needs to be increased)
 				if (blobFile.length() > Integer.MAX_VALUE) {
 					throw new IOException("BLOB size exceeds the maximum size (2 GB).");
 				}
@@ -259,7 +271,7 @@ class BlobServerConnection extends Thread {
 					writeErrorToStream(outputStream, t);
 				}
 				catch (IOException e) {
-					// since we are in an exception case, it means not much that we could not send the error
+					// since we are in an exception case, it means that we could not send the error
 					// ignore this
 				}
 				clientSocket.close();
@@ -294,59 +306,48 @@ class BlobServerConnection extends Thread {
 
 	/**
 	 * Handles an incoming PUT request from a BLOB client.
-	 * 
-	 * @param inputStream The input stream to read incoming data from.
-	 * @param outputStream The output stream to send data back to the client.
-	 * @param buf An auxiliary buffer for data serialization/deserialization.
+	 *
+	 * @param inputStream
+	 * 		The input stream to read incoming data from
+	 * @param outputStream
+	 * 		The output stream to send data back to the client
+	 * @param buf
+	 * 		An auxiliary buffer for data serialization/deserialization
+	 *
+	 * @throws IOException
+	 * 		thrown if an I/O error occurs while reading/writing data from/to the respective streams
 	 */
 	private void put(InputStream inputStream, OutputStream outputStream, byte[] buf) throws IOException {
-		JobID jobID = null;
-		MessageDigest md = null;
-
 		File incomingFile = null;
-		FileOutputStream fos = null;
 
 		try {
-			final int contentAddressable = inputStream.read();
-			if (contentAddressable < 0) {
+			final int mode = inputStream.read();
+
+			if (mode < 0) {
 				throw new EOFException("Premature end of PUT request");
 			}
 
-			if (contentAddressable == CONTENT_ADDRESSABLE) {
-				md = BlobUtils.createMessageDigest();
-			}
-			else {
+			// Receive the job ID and key
+			final JobID jobId;
+			if (mode == CONTENT_NO_JOB) {
+				jobId = null;
+			} else if (mode == CONTENT_FOR_JOB) {
+				byte[] jidBytes = new byte[JobID.SIZE];
+				readFully(inputStream, jidBytes, 0, JobID.SIZE, "JobID");
+				jobId = JobID.fromByteArray(jidBytes);
+			} else {
 				throw new IOException("Unknown type of BLOB addressing.");
 			}
 
 			if (LOG.isDebugEnabled()) {
-				LOG.debug("Received PUT request for content addressable BLOB");
+				LOG.debug("Received PUT request for BLOB of job {} with from {}.", jobId,
+					clientSocket.getInetAddress());
 			}
 
 			incomingFile = blobServer.createTemporaryFilename();
-			fos = new FileOutputStream(incomingFile);
+			BlobKey blobKey = readFileFully(inputStream, incomingFile, buf);
 
-			while (true) {
-				final int bytesExpected = readLength(inputStream);
-				if (bytesExpected == -1) {
-					// done
-					break;
-				}
-				if (bytesExpected > BUFFER_SIZE) {
-					throw new IOException("Unexpected number of incoming bytes: " + bytesExpected);
-				}
-
-				readFully(inputStream, buf, 0, bytesExpected, "buffer");
-				fos.write(buf, 0, bytesExpected);
-
-				if (md != null) {
-					md.update(buf, 0, bytesExpected);
-				}
-			}
-			fos.close();
-
-			BlobKey blobKey = new BlobKey(md.digest());
-			File storageFile = blobServer.getStorageLocation(blobKey);
+			File storageFile = blobServer.getStorageLocation(jobId, blobKey);
 
 			writeLock.lock();
 
@@ -369,13 +370,15 @@ class BlobServerConnection extends Thread {
 
 					// only the one moving the incoming file to its final destination is allowed to upload the
 					// file to the blob store
-					blobStore.put(storageFile, blobKey);
+					blobStore.put(storageFile, jobId, blobKey);
+				} else {
+					LOG.warn("File upload for an existing file with key {} for job {}. This may indicate a duplicate upload or a hash collision. Ignoring newest upload.", blobKey, jobId);
 				}
 			} catch(IOException ioe) {
 				// we failed to either create the local storage file or to upload it --> try to delete the local file
 				// while still having the write lock
-				if (storageFile.exists() && !storageFile.delete()) {
-					LOG.warn("Could not delete the storage file.");
+				if (!storageFile.delete() && storageFile.exists()) {
+					LOG.warn("Could not delete the storage file with key {} and job {}.", blobKey, jobId);
 				}
 
 				throw ioe;
@@ -403,15 +406,8 @@ class BlobServerConnection extends Thread {
 			clientSocket.close();
 		}
 		finally {
-			if (fos != null) {
-				try {
-					fos.close();
-				} catch (Throwable t) {
-					LOG.warn("Cannot close stream to BLOB staging file", t);
-				}
-			}
 			if (incomingFile != null) {
-				if (!incomingFile.delete()) {
+				if (!incomingFile.delete() && incomingFile.exists()) {
 					LOG.warn("Cannot delete BLOB server staging file " + incomingFile.getAbsolutePath());
 				}
 			}
@@ -419,27 +415,87 @@ class BlobServerConnection extends Thread {
 	}
 
 	/**
+	 * Reads a full file from <tt>inputStream</tt> into <tt>incomingFile</tt> returning its checksum.
+	 *
+	 * @param inputStream
+	 * 		stream to read from
+	 * @param incomingFile
+	 * 		file to write to
+	 * @param buf
+	 * 		An auxiliary buffer for data serialization/deserialization
+	 *
+	 * @return the received file's content hash as a BLOB key
+	 *
+	 * @throws IOException
+	 * 		thrown if an I/O error occurs while reading/writing data from/to the respective streams
+	 */
+	private static BlobKey readFileFully(
+			final InputStream inputStream, final File incomingFile, final byte[] buf)
+			throws IOException {
+		MessageDigest md = BlobUtils.createMessageDigest();
+		FileOutputStream fos = new FileOutputStream(incomingFile);
+
+		try {
+			while (true) {
+				final int bytesExpected = readLength(inputStream);
+				if (bytesExpected == -1) {
+					// done
+					break;
+				}
+				if (bytesExpected > BUFFER_SIZE) {
+					throw new IOException(
+						"Unexpected number of incoming bytes: " + bytesExpected);
+				}
+
+				readFully(inputStream, buf, 0, bytesExpected, "buffer");
+				fos.write(buf, 0, bytesExpected);
+
+				md.update(buf, 0, bytesExpected);
+			}
+			return new BlobKey(md.digest());
+		} finally {
+			try {
+				fos.close();
+			} catch (Throwable t) {
+				LOG.warn("Cannot close stream to BLOB staging file", t);
+			}
+		}
+	}
+
+	/**
 	 * Handles an incoming DELETE request from a BLOB client.
-	 * 
-	 * @param inputStream The input stream to read the request from.
-	 * @param outputStream The output stream to write the response to.
-	 * @throws java.io.IOException Thrown if an I/O error occurs while reading the request data from the input stream.
+	 *
+	 * @param inputStream
+	 * 		The input stream to read the request from.
+	 * @param outputStream
+	 * 		The output stream to write the response to.
+	 *
+	 * @throws IOException
+	 * 		Thrown if an I/O error occurs while reading the request data from the input stream.
 	 */
 	private void delete(InputStream inputStream, OutputStream outputStream) throws IOException {
 
 		try {
-			int type = inputStream.read();
-			if (type < 0) {
+			final int mode = inputStream.read();
+
+			if (mode < 0) {
 				throw new EOFException("Premature end of DELETE request");
 			}
 
-			if (type == CONTENT_ADDRESSABLE) {
-				BlobKey key = BlobKey.readFromInputStream(inputStream);
-				blobServer.delete(key);
-			}
-			else {
-				throw new IOException("Unrecognized addressing type: " + type);
+			// Receive the job ID and key
+			final JobID jobId;
+			if (mode == CONTENT_NO_JOB) {
+				jobId = null;
+			} else if (mode == CONTENT_FOR_JOB) {
+				byte[] jidBytes = new byte[JobID.SIZE];
+				readFully(inputStream, jidBytes, 0, JobID.SIZE, "JobID");
+				jobId = JobID.fromByteArray(jidBytes);
+			} else {
+				throw new IOException("Unknown type of BLOB addressing.");
 			}
+			BlobKey key = BlobKey.readFromInputStream(inputStream);
+
+			blobServer.deleteInternal(jobId, key);
 
 			outputStream.write(RETURN_OKAY);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/0a19c456/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerProtocol.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerProtocol.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerProtocol.java
index d8ac833..681fc81 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerProtocol.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerProtocol.java
@@ -42,12 +42,20 @@ public class BlobServerProtocol {
 	static final byte RETURN_ERROR = 1;
 
 	/**
-	 * Internal code to identify a reference via content hash as the key.
+	 * Internal code to identify a job-unrelated reference via content hash as the key.
 	 * <p>
 	 * Note: previously, there was also <tt>NAME_ADDRESSABLE</tt> (code <tt>1</tt>) and
 	 * <tt>JOB_ID_SCOPE</tt> (code <tt>2</tt>).
 	 */
-	static final byte CONTENT_ADDRESSABLE = 0;
+	static final byte CONTENT_NO_JOB = 0;
+
+	/**
+	 * Internal code to identify a job-related reference via content hash as the key.
+	 * <p>
+	 * Note: previously, there was also <tt>NAME_ADDRESSABLE</tt> (code <tt>1</tt>) and
+	 * <tt>JOB_ID_SCOPE</tt> (code <tt>2</tt>).
+	 */
+	static final byte CONTENT_FOR_JOB = 3;
 
 	// --------------------------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0a19c456/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
index 1e56f26..a78c88c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.runtime.blob;
 
+import org.apache.flink.api.common.JobID;
+
+import javax.annotation.Nonnull;
 import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
@@ -28,7 +31,8 @@ import java.io.IOException;
 public interface BlobService extends Closeable {
 
 	/**
-	 * Returns the path to a local copy of the file associated with the provided blob key.
+	 * Returns the path to a local copy of the (job-unrelated) file associated with the provided
+	 * blob key.
 	 *
 	 * @param key blob key associated with the requested file
 	 * @return The path to the file.
@@ -37,9 +41,19 @@ public interface BlobService extends Closeable {
 	 */
 	File getFile(BlobKey key) throws IOException;
 
+	/**
+	 * Returns the path to a local copy of the file associated with the provided job ID and blob key.
+	 *
+	 * @param jobId ID of the job this blob belongs to
+	 * @param key blob key associated with the requested file
+	 * @return The path to the file.
+	 * @throws java.io.FileNotFoundException when the path does not exist;
+	 * @throws IOException if any other error occurs when retrieving the file
+	 */
+	File getFile(@Nonnull JobID jobId, BlobKey key) throws IOException;
 
 	/**
-	 * Deletes the file associated with the provided blob key.
+	 * Deletes the (job-unrelated) file associated with the provided blob key.
 	 *
 	 * @param key associated with the file to be deleted
 	 * @throws IOException
@@ -47,10 +61,19 @@ public interface BlobService extends Closeable {
 	void delete(BlobKey key) throws IOException;
 
 	/**
+	 * Deletes the file associated with the provided job ID and blob key.
+	 *
+	 * @param jobId ID of the job this blob belongs to
+	 * @param key associated with the file to be deleted
+	 * @throws IOException
+	 */
+	void delete(@Nonnull JobID jobId, BlobKey key) throws IOException;
+
+	/**
 	 * Returns the port of the blob service.
 	 * @return the port of the blob service.
 	 */
 	int getPort();
-	
+
 	BlobClient createClient() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0a19c456/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java
index 1e8b73a..d2ea8ca 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java
@@ -32,19 +32,21 @@ public interface BlobStore extends BlobView {
 	 * Copies the local file to the blob store.
 	 *
 	 * @param localFile The file to copy
+	 * @param jobId ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated)
 	 * @param blobKey   The ID for the file in the blob store
 	 * @throws IOException If the copy fails
 	 */
-	void put(File localFile, BlobKey blobKey) throws IOException;
+	void put(File localFile, JobID jobId, BlobKey blobKey) throws IOException;
 
 	/**
 	 * Tries to delete a blob from storage.
 	 *
 	 * <p>NOTE: This also tries to delete any created directories if empty.</p>
 	 *
+	 * @param jobId ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated)
 	 * @param blobKey The blob ID
 	 */
-	void delete(BlobKey blobKey);
+	void delete(JobID jobId, BlobKey blobKey);
 
 	/**
 	 * Tries to delete all blobs for the given job from storage.

http://git-wip-us.apache.org/repos/asf/flink/blob/0a19c456/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
index e8f3fe5..9a13412 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
@@ -28,6 +28,8 @@ import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.util.StringUtils;
 import org.slf4j.Logger;
 
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 import java.io.EOFException;
 import java.io.File;
 import java.io.IOException;
@@ -62,6 +64,11 @@ public class BlobUtils {
 	private static final String JOB_DIR_PREFIX = "job_";
 
 	/**
+	 * The prefix of all job-unrelated directories created by the BLOB server.
+	 */
+	private static final String NO_JOB_DIR_PREFIX = "no_job";
+
+	/**
 	 * Creates a BlobStore based on the parameters set in the configuration.
 	 *
 	 * @param config
@@ -116,26 +123,29 @@ public class BlobUtils {
 	}
 
 	/**
-	 * Creates a storage directory for a blob service.
+	 * Creates a local storage directory for a blob service under the given parent directory.
 	 *
-	 * @return the storage directory used by a BLOB service
+	 * @param basePath
+	 * 		base path, i.e. parent directory, of the storage directory to use (if <tt>null</tt> or
+	 * 		empty, the path in <tt>java.io.tmpdir</tt> will be used)
+	 *
+	 * @return a new local storage directory
 	 *
 	 * @throws IOException
-	 * 		thrown if the (local or distributed) file storage cannot be created or
-	 * 		is not usable
+	 * 		thrown if the local file storage cannot be created or is not usable
 	 */
-	static File initStorageDirectory(String storageDirectory) throws
-		IOException {
+	static File initLocalStorageDirectory(String basePath) throws IOException {
 		File baseDir;
-		if (StringUtils.isNullOrWhitespaceOnly(storageDirectory)) {
+		if (StringUtils.isNullOrWhitespaceOnly(basePath)) {
 			baseDir = new File(System.getProperty("java.io.tmpdir"));
 		}
 		else {
-			baseDir = new File(storageDirectory);
+			baseDir = new File(basePath);
 		}
 
 		File storageDir;
 
+		// NOTE: although we will be using UUIDs, there may be collisions
 		final int MAX_ATTEMPTS = 10;
 		for(int attempt = 0; attempt < MAX_ATTEMPTS; attempt++) {
 			storageDir = new File(baseDir, String.format(
@@ -143,7 +153,7 @@ public class BlobUtils {
 
 			// Create the storage dir if it doesn't exist. Only return it when the operation was
 			// successful.
-			if (!storageDir.exists() && storageDir.mkdirs()) {
+			if (storageDir.mkdirs()) {
 				return storageDir;
 			}
 		}
@@ -153,46 +163,108 @@ public class BlobUtils {
 	}
 
 	/**
-	 * Returns the BLOB service's directory for incoming files. The directory is created if it did
-	 * not exist so far.
+	 * Returns the BLOB service's directory for incoming (job-unrelated) files. The directory is
+	 * created if it does not exist yet.
+	 *
+	 * @param storageDir
+	 * 		storage directory used be the BLOB service
 	 *
-	 * @return the BLOB server's directory for incoming files
+	 * @return the BLOB service's directory for incoming files
 	 */
 	static File getIncomingDirectory(File storageDir) {
 		final File incomingDir = new File(storageDir, "incoming");
 
-		if (!incomingDir.mkdirs() && !incomingDir.exists()) {
-			throw new RuntimeException("Cannot create directory for incoming files " + incomingDir.getAbsolutePath());
-		}
+		mkdirTolerateExisting(incomingDir, "incoming");
 
 		return incomingDir;
 	}
 
 	/**
-	 * Returns the BLOB service's directory for cached files. The directory is created if it did
-	 * not exist so far.
+	 * Makes sure a given directory exists by creating it if necessary.
 	 *
-	 * @return the BLOB server's directory for cached files
+	 * @param dir
+	 * 		directory to create
+	 * @param dirType
+	 * 		the type of the directory (included in error message if something fails)
 	 */
-	private static File getCacheDirectory(File storageDir) {
-		final File cacheDirectory = new File(storageDir, "cache");
-
-		if (!cacheDirectory.mkdirs() && !cacheDirectory.exists()) {
-			throw new RuntimeException("Could not create cache directory '" + cacheDirectory.getAbsolutePath() + "'.");
+	private static void mkdirTolerateExisting(final File dir, final String dirType) {
+		// note: thread-safe create should try to mkdir first and then ignore the case that the
+		//       directory already existed
+		if (!dir.mkdirs() && !dir.exists()) {
+			throw new RuntimeException(
+				"Cannot create " + dirType + " directory '" + dir.getAbsolutePath() + "'.");
 		}
-
-		return cacheDirectory;
 	}
 
 	/**
 	 * Returns the (designated) physical storage location of the BLOB with the given key.
 	 *
+	 * @param storageDir
+	 * 		storage directory used be the BLOB service
 	 * @param key
-	 *        the key identifying the BLOB
+	 * 		the key identifying the BLOB
+	 * @param jobId
+	 * 		ID of the job for the incoming files (or <tt>null</tt> if job-unrelated)
+	 *
 	 * @return the (designated) physical storage location of the BLOB
 	 */
-	static File getStorageLocation(File storageDir, BlobKey key) {
-		return new File(getCacheDirectory(storageDir), BLOB_FILE_PREFIX + key.toString());
+	static File getStorageLocation(
+			@Nonnull File storageDir, @Nullable JobID jobId, @Nonnull BlobKey key) {
+		File file = new File(getStorageLocationPath(storageDir.getAbsolutePath(), jobId, key));
+
+		mkdirTolerateExisting(file.getParentFile(), "cache");
+
+		return file;
+	}
+
+	/**
+	 * Returns the BLOB server's storage directory for BLOBs belonging to the job with the given ID
+	 * <em>without</em> creating the directory.
+	 *
+	 * @param storageDir
+	 * 		storage directory used be the BLOB service
+	 * @param jobId
+	 * 		the ID of the job to return the storage directory for
+	 *
+	 * @return the storage directory for BLOBs belonging to the job with the given ID
+	 */
+	static String getStorageLocationPath(@Nonnull String storageDir, @Nullable JobID jobId) {
+		if (jobId == null) {
+			// format: $base/no_job
+			return String.format("%s/%s", storageDir, NO_JOB_DIR_PREFIX);
+		} else {
+			// format: $base/job_$jobId
+			return String.format("%s/%s%s", storageDir, JOB_DIR_PREFIX, jobId.toString());
+		}
+	}
+
+	/**
+	 * Returns the path for the given blob key.
+	 * <p>
+	 * The returned path can be used with the (local or HA) BLOB store file system back-end for
+	 * recovery purposes and follows the same scheme as {@link #getStorageLocation(File, JobID,
+	 * BlobKey)}.
+	 *
+	 * @param storageDir
+	 * 		storage directory used be the BLOB service
+	 * @param key
+	 * 		the key identifying the BLOB
+	 * @param jobId
+	 * 		ID of the job for the incoming files
+	 *
+	 * @return the path to the given BLOB
+	 */
+	static String getStorageLocationPath(
+			@Nonnull String storageDir, @Nullable JobID jobId, @Nonnull BlobKey key) {
+		if (jobId == null) {
+			// format: $base/no_job/blob_$key
+			return String.format("%s/%s/%s%s",
+				storageDir, NO_JOB_DIR_PREFIX, BLOB_FILE_PREFIX, key.toString());
+		} else {
+			// format: $base/job_$jobId/blob_$key
+			return String.format("%s/%s%s/%s%s",
+				storageDir, JOB_DIR_PREFIX, jobId.toString(), BLOB_FILE_PREFIX, key.toString());
+		}
 	}
 
 	/**
@@ -200,6 +272,7 @@ public class BlobUtils {
 	 *
 	 * @return a new instance of the message digest to use for the BLOB key computation
 	 */
+	@Nonnull
 	static MessageDigest createMessageDigest() {
 		try {
 			return MessageDigest.getInstance(HASHING_ALGORITHM);
@@ -333,28 +406,6 @@ public class BlobUtils {
 	}
 
 	/**
-	 * Returns the path for the given blob key.
-	 *
-	 * <p>The returned path can be used with the state backend for recovery purposes.
-	 *
-	 * <p>This follows the same scheme as {@link #getStorageLocation(File, BlobKey)}
-	 * and is used for HA.
-	 */
-	static String getRecoveryPath(String basePath, BlobKey blobKey) {
-		// format: $base/cache/blob_$key
-		return String.format("%s/cache/%s%s", basePath, BLOB_FILE_PREFIX, blobKey.toString());
-	}
-
-	/**
-	 * Returns the path for the given job ID.
-	 *
-	 * <p>The returned path can be used with the state backend for recovery purposes.
-	 */
-	static String getRecoveryPath(String basePath, JobID jobId) {
-		return String.format("%s/%s%s", basePath, JOB_DIR_PREFIX, jobId.toString());
-	}
-
-	/**
 	 * Private constructor to prevent instantiation.
 	 */
 	private BlobUtils() {

http://git-wip-us.apache.org/repos/asf/flink/blob/0a19c456/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobView.java
index 2e2e4a7..8916d95 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobView.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.blob;
 
+import org.apache.flink.api.common.JobID;
+
 import java.io.File;
 import java.io.IOException;
 
@@ -29,9 +31,10 @@ public interface BlobView {
 	/**
 	 * Copies a blob to a local file.
 	 *
+	 * @param jobId     ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated)
 	 * @param blobKey   The blob ID
 	 * @param localFile The local file to copy to
 	 * @throws IOException If the copy fails
 	 */
-	void get(BlobKey blobKey, File localFile) throws IOException;
+	void get(JobID jobId, BlobKey blobKey, File localFile) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0a19c456/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
index 5f8058b..83abecb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
@@ -64,8 +64,8 @@ public class FileSystemBlobStore implements BlobStoreService {
 	// - Put ------------------------------------------------------------------
 
 	@Override
-	public void put(File localFile, BlobKey blobKey) throws IOException {
-		put(localFile, BlobUtils.getRecoveryPath(basePath, blobKey));
+	public void put(File localFile, JobID jobId, BlobKey blobKey) throws IOException {
+		put(localFile, BlobUtils.getStorageLocationPath(basePath, jobId, blobKey));
 	}
 
 	private void put(File fromFile, String toBlobPath) throws IOException {
@@ -78,8 +78,8 @@ public class FileSystemBlobStore implements BlobStoreService {
 	// - Get ------------------------------------------------------------------
 
 	@Override
-	public void get(BlobKey blobKey, File localFile) throws IOException {
-		get(BlobUtils.getRecoveryPath(basePath, blobKey), localFile);
+	public void get(JobID jobId, BlobKey blobKey, File localFile) throws IOException {
+		get(BlobUtils.getStorageLocationPath(basePath, jobId, blobKey), localFile);
 	}
 
 	private void get(String fromBlobPath, File toFile) throws IOException {
@@ -112,13 +112,13 @@ public class FileSystemBlobStore implements BlobStoreService {
 	// - Delete ---------------------------------------------------------------
 
 	@Override
-	public void delete(BlobKey blobKey) {
-		delete(BlobUtils.getRecoveryPath(basePath, blobKey));
+	public void delete(JobID jobId, BlobKey blobKey) {
+		delete(BlobUtils.getStorageLocationPath(basePath, jobId, blobKey));
 	}
 
 	@Override
 	public void deleteAll(JobID jobId) {
-		delete(BlobUtils.getRecoveryPath(basePath, jobId));
+		delete(BlobUtils.getStorageLocationPath(basePath, jobId));
 	}
 
 	private void delete(String blobPath) {

http://git-wip-us.apache.org/repos/asf/flink/blob/0a19c456/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
index 6e2bb53..95be569 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
@@ -29,16 +29,15 @@ import java.io.IOException;
 public class VoidBlobStore implements BlobStoreService {
 
 	@Override
-	public void put(File localFile, BlobKey blobKey) throws IOException {
+	public void put(File localFile, JobID jobId, BlobKey blobKey) throws IOException {
 	}
 
-
 	@Override
-	public void get(BlobKey blobKey, File localFile) throws IOException {
+	public void get(JobID jobId, BlobKey blobKey, File localFile) throws IOException {
 	}
 
 	@Override
-	public void delete(BlobKey blobKey) {
+	public void delete(JobID jobId, BlobKey blobKey) {
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/0a19c456/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
index e3657ff..9cc6210 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
@@ -234,6 +234,7 @@ public class JobClient {
 			int pos = 0;
 			for (BlobKey blobKey : props.requiredJarFiles()) {
 				try {
+					// TODO: make use of job-related BLOBs after adapting the BlobLibraryCacheManager
 					allURLs[pos++] = blobClient.getFile(blobKey).toURI().toURL();
 				} catch (Exception e) {
 					try {

http://git-wip-us.apache.org/repos/asf/flink/blob/0a19c456/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index 1c68515..6b92d79 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -535,6 +535,7 @@ public class JobGraph implements Serializable {
 			InetSocketAddress blobServerAddress,
 			Configuration blobClientConfig) throws IOException {
 		if (!userJars.isEmpty()) {
+			// TODO: make use of job-related BLOBs after adapting the BlobLibraryCacheManager
 			List<BlobKey> blobKeys = BlobClient.uploadJarFiles(blobServerAddress, blobClientConfig, userJars);
 
 			for (BlobKey blobKey : blobKeys) {

http://git-wip-us.apache.org/repos/asf/flink/blob/0a19c456/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
index fe763fa..8c575a9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.blob;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
@@ -25,6 +26,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
+import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.InetSocketAddress;
@@ -41,7 +43,8 @@ public class BlobCacheRetriesTest {
 	public TemporaryFolder temporaryFolder = new TemporaryFolder();
 
 	/**
-	 * A test where the connection fails twice and then the get operation succeeds.
+	 * A test where the connection fails twice and then the get operation succeeds
+	 * (job-unrelated blob).
 	 */
 	@Test
 	public void testBlobFetchRetries() throws IOException {
@@ -49,15 +52,41 @@ public class BlobCacheRetriesTest {
 		config.setString(BlobServerOptions.STORAGE_DIRECTORY,
 			temporaryFolder.newFolder().getAbsolutePath());
 
-		testBlobFetchRetries(config, new VoidBlobStore());
+		testBlobFetchRetries(config, new VoidBlobStore(), null);
+	}
+
+	/**
+	 * A test where the connection fails twice and then the get operation succeeds
+	 * (job-related blob).
+	 */
+	@Test
+	public void testBlobForJobFetchRetries() throws IOException {
+		final Configuration config = new Configuration();
+		config.setString(BlobServerOptions.STORAGE_DIRECTORY,
+			temporaryFolder.newFolder().getAbsolutePath());
+
+		testBlobFetchRetries(config, new VoidBlobStore(), new JobID());
+	}
+
+	/**
+	 * A test where the connection fails twice and then the get operation succeeds
+	 * (with high availability set, job-unrelated blob).
+	 */
+	@Test
+	public void testBlobNoJobFetchRetriesHa() throws IOException {
+		testBlobFetchRetriesHa(null);
 	}
 
 	/**
 	 * A test where the connection fails twice and then the get operation succeeds
-	 * (with high availability set).
+	 * (with high availability set, job-related job).
 	 */
 	@Test
 	public void testBlobFetchRetriesHa() throws IOException {
+		testBlobFetchRetriesHa(new JobID());
+	}
+
+	private void testBlobFetchRetriesHa(final JobID jobId) throws IOException {
 		final Configuration config = new Configuration();
 		config.setString(BlobServerOptions.STORAGE_DIRECTORY,
 			temporaryFolder.newFolder().getAbsolutePath());
@@ -70,7 +99,7 @@ public class BlobCacheRetriesTest {
 		try {
 			blobStoreService = BlobUtils.createBlobStoreFromConfig(config);
 
-			testBlobFetchRetries(config, blobStoreService);
+			testBlobFetchRetries(config, blobStoreService, jobId);
 		} finally {
 			if (blobStoreService != null) {
 				blobStoreService.closeAndCleanupAllData();
@@ -86,7 +115,9 @@ public class BlobCacheRetriesTest {
 	 * 		configuration to use (the BlobCache will get some additional settings
 	 * 		set compared to this one)
 	 */
-	private void testBlobFetchRetries(final Configuration config, final BlobStore blobStore) throws IOException {
+	private static void testBlobFetchRetries(
+			final Configuration config, final BlobStore blobStore, final JobID jobId)
+			throws IOException {
 		final byte[] data = new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 0};
 
 		BlobServer server = null;
@@ -104,7 +135,7 @@ public class BlobCacheRetriesTest {
 			try {
 				blobClient = new BlobClient(serverAddress, config);
 
-				key = blobClient.put(data);
+				key = blobClient.put(jobId, data);
 			}
 			finally {
 				if (blobClient != null) {
@@ -115,16 +146,13 @@ public class BlobCacheRetriesTest {
 			cache = new BlobCache(serverAddress, config, new VoidBlobStore());
 
 			// trigger a download - it should fail the first two times, but retry, and succeed eventually
-			URL url = cache.getFile(key).toURI().toURL();
-			InputStream is = url.openStream();
-			try {
+			File file = jobId == null ? cache.getFile(key) : cache.getFile(jobId, key);
+			URL url = file.toURI().toURL();
+			try (InputStream is = url.openStream()) {
 				byte[] received = new byte[data.length];
 				assertEquals(data.length, is.read(received));
 				assertArrayEquals(data, received);
 			}
-			finally {
-				is.close();
-			}
 		} finally {
 			if (cache != null) {
 				cache.close();
@@ -136,23 +164,50 @@ public class BlobCacheRetriesTest {
 	}
 
 	/**
-	 * A test where the connection fails too often and eventually fails the GET request.
+	 * A test where the connection fails too often and eventually fails the GET request
+	 * (job-unrelated blob).
+	 */
+	@Test
+	public void testBlobNoJobFetchWithTooManyFailures() throws IOException {
+		final Configuration config = new Configuration();
+		config.setString(BlobServerOptions.STORAGE_DIRECTORY,
+			temporaryFolder.newFolder().getAbsolutePath());
+
+		testBlobFetchWithTooManyFailures(config, new VoidBlobStore(), null);
+	}
+
+	/**
+	 * A test where the connection fails too often and eventually fails the GET request (job-related
+	 * blob).
 	 */
 	@Test
-	public void testBlobFetchWithTooManyFailures() throws IOException {
+	public void testBlobForJobFetchWithTooManyFailures() throws IOException {
 		final Configuration config = new Configuration();
 		config.setString(BlobServerOptions.STORAGE_DIRECTORY,
 			temporaryFolder.newFolder().getAbsolutePath());
 
-		testBlobFetchWithTooManyFailures(config, new VoidBlobStore());
+		testBlobFetchWithTooManyFailures(config, new VoidBlobStore(), new JobID());
+	}
+
+	/**
+	 * A test where the connection fails twice and then the get operation succeeds
+	 * (with high availability set, job-unrelated blob).
+	 */
+	@Test
+	public void testBlobNoJobFetchWithTooManyFailuresHa() throws IOException {
+		testBlobFetchWithTooManyFailuresHa(null);
 	}
 
 	/**
 	 * A test where the connection fails twice and then the get operation succeeds
-	 * (with high availability set).
+	 * (with high availability set, job-related blob).
 	 */
 	@Test
-	public void testBlobFetchWithTooManyFailuresHa() throws IOException {
+	public void testBlobForJobFetchWithTooManyFailuresHa() throws IOException {
+		testBlobFetchWithTooManyFailuresHa(new JobID());
+	}
+
+	private void testBlobFetchWithTooManyFailuresHa(final JobID jobId) throws IOException {
 		final Configuration config = new Configuration();
 		config.setString(BlobServerOptions.STORAGE_DIRECTORY,
 			temporaryFolder.newFolder().getAbsolutePath());
@@ -165,7 +220,7 @@ public class BlobCacheRetriesTest {
 		try {
 			blobStoreService = BlobUtils.createBlobStoreFromConfig(config);
 
-			testBlobFetchWithTooManyFailures(config, blobStoreService);
+			testBlobFetchWithTooManyFailures(config, blobStoreService, jobId);
 		} finally {
 			if (blobStoreService != null) {
 				blobStoreService.closeAndCleanupAllData();
@@ -181,7 +236,9 @@ public class BlobCacheRetriesTest {
 	 * 		configuration to use (the BlobCache will get some additional settings
 	 * 		set compared to this one)
 	 */
-	private void testBlobFetchWithTooManyFailures(final Configuration config, final BlobStore blobStore) throws IOException {
+	private static void testBlobFetchWithTooManyFailures(
+		final Configuration config, final BlobStore blobStore, final JobID jobId)
+			throws IOException {
 		final byte[] data = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 0 };
 
 		BlobServer server = null;
@@ -199,7 +256,7 @@ public class BlobCacheRetriesTest {
 			try {
 				blobClient = new BlobClient(serverAddress, config);
 
-				key = blobClient.put(data);
+				key = blobClient.put(jobId, data);
 			}
 			finally {
 				if (blobClient != null) {
@@ -211,7 +268,11 @@ public class BlobCacheRetriesTest {
 
 			// trigger a download - it should fail eventually
 			try {
-				cache.getFile(key);
+				if (jobId == null) {
+					cache.getFile(key);
+				} else {
+					cache.getFile(jobId, key);
+				}
 				fail("This should fail");
 			}
 			catch (IOException e) {