You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/10/05 14:07:05 UTC

[12/14] flink git commit: [FLINK-7068][blob] Introduce permanent and transient BLOB keys

http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java
index 9e2d165..d6fab50 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java
@@ -55,7 +55,7 @@ public class BlobCacheCleanupTest extends TestLogger {
 	public void testJobCleanup() throws IOException, InterruptedException {
 
 		JobID jobId = new JobID();
-		List<BlobKey> keys = new ArrayList<>();
+		List<PermanentBlobKey> keys = new ArrayList<>();
 		BlobServer server = null;
 		PermanentBlobCache cache = null;
 
@@ -73,9 +73,9 @@ public class BlobCacheCleanupTest extends TestLogger {
 			cache = new PermanentBlobCache(serverAddress, config, new VoidBlobStore());
 
 			// upload blobs
-			keys.add(server.putHA(jobId, buf));
+			keys.add(server.putPermanent(jobId, buf));
 			buf[0] += 1;
-			keys.add(server.putHA(jobId, buf));
+			keys.add(server.putPermanent(jobId, buf));
 
 			checkFileCountForJob(2, jobId, server);
 			checkFileCountForJob(0, jobId, cache);
@@ -86,14 +86,14 @@ public class BlobCacheCleanupTest extends TestLogger {
 			checkFileCountForJob(2, jobId, server);
 			checkFileCountForJob(0, jobId, cache);
 
-			for (BlobKey key : keys) {
-				cache.getHAFile(jobId, key);
+			for (PermanentBlobKey key : keys) {
+				cache.getFile(jobId, key);
 			}
 
 			// register again (let's say, from another thread or so)
 			cache.registerJob(jobId);
-			for (BlobKey key : keys) {
-				cache.getHAFile(jobId, key);
+			for (PermanentBlobKey key : keys) {
+				cache.getFile(jobId, key);
 			}
 
 			assertEquals(2, checkFilesExist(jobId, keys, cache, true));
@@ -109,21 +109,7 @@ public class BlobCacheCleanupTest extends TestLogger {
 
 			// after releasing the second time, the job is up for deferred cleanup
 			cache.releaseJob(jobId);
-
-			// because we cannot guarantee that there are not thread races in the build system, we
-			// loop for a certain while until the references disappear
-			{
-				long deadline = System.currentTimeMillis() + 30_000L;
-				do {
-					Thread.sleep(100);
-				}
-				while (checkFilesExist(jobId, keys, cache, false) != 0 &&
-					System.currentTimeMillis() < deadline);
-			}
-
-			// the blob cache should no longer contain the files
-			// this fails if we exited via a timeout
-			checkFileCountForJob(0, jobId, cache);
+			verifyJobCleanup(cache, jobId, keys);
 			// server should be unaffected
 			checkFileCountForJob(2, jobId, server);
 		}
@@ -141,8 +127,8 @@ public class BlobCacheCleanupTest extends TestLogger {
 	}
 
 	/**
-	 * Tests that {@link BlobCache} sets the expected reference counts and cleanup timeouts when
-	 * registering, releasing, and re-registering jobs.
+	 * Tests that {@link PermanentBlobCache} sets the expected reference counts and cleanup timeouts
+	 * when registering, releasing, and re-registering jobs.
 	 */
 	@Test
 	public void testJobReferences() throws IOException, InterruptedException {
@@ -208,7 +194,7 @@ public class BlobCacheCleanupTest extends TestLogger {
 		long cleanupInterval = 5L;
 
 		JobID jobId = new JobID();
-		List<BlobKey> keys = new ArrayList<BlobKey>();
+		List<PermanentBlobKey> keys = new ArrayList<>();
 		BlobServer server = null;
 		PermanentBlobCache cache = null;
 
@@ -226,9 +212,9 @@ public class BlobCacheCleanupTest extends TestLogger {
 			cache = new PermanentBlobCache(serverAddress, config, new VoidBlobStore());
 
 			// upload blobs
-			keys.add(server.putHA(jobId, buf));
+			keys.add(server.putPermanent(jobId, buf));
 			buf[0] += 1;
-			keys.add(server.putHA(jobId, buf));
+			keys.add(server.putPermanent(jobId, buf));
 
 			checkFileCountForJob(2, jobId, server);
 			checkFileCountForJob(0, jobId, cache);
@@ -239,14 +225,14 @@ public class BlobCacheCleanupTest extends TestLogger {
 			checkFileCountForJob(2, jobId, server);
 			checkFileCountForJob(0, jobId, cache);
 
-			for (BlobKey key : keys) {
-				cache.getHAFile(jobId, key);
+			for (PermanentBlobKey key : keys) {
+				cache.getFile(jobId, key);
 			}
 
 			// register again (let's say, from another thread or so)
 			cache.registerJob(jobId);
-			for (BlobKey key : keys) {
-				cache.getHAFile(jobId, key);
+			for (PermanentBlobKey key : keys) {
+				cache.getFile(jobId, key);
 			}
 
 			assertEquals(2, checkFilesExist(jobId, keys, cache, true));
@@ -275,20 +261,7 @@ public class BlobCacheCleanupTest extends TestLogger {
 			Thread.sleep((cleanupInterval * 4) / 5);
 
 			// files are up for cleanup now...wait for it:
-			// because we cannot guarantee that there are not thread races in the build system, we
-			// loop for a certain while until the references disappear
-			{
-				long deadline = System.currentTimeMillis() + 30_000L;
-				do {
-					Thread.sleep(100);
-				}
-				while (checkFilesExist(jobId, keys, cache, false) != 0 &&
-					System.currentTimeMillis() < deadline);
-			}
-
-			// the blob cache should no longer contain the files
-			// this fails if we exited via a timeout
-			checkFileCountForJob(0, jobId, cache);
+			verifyJobCleanup(cache, jobId, keys);
 			// server should be unaffected
 			checkFileCountForJob(2, jobId, server);
 		}
@@ -306,6 +279,36 @@ public class BlobCacheCleanupTest extends TestLogger {
 	}
 
 	/**
+	 * Checks that BLOBs for the given <tt>jobId</tt> are cleaned up eventually (after calling
+	 * {@link PermanentBlobCache#releaseJob(JobID)}, which is not done by this method!) (waits at
+	 * most 30s).
+	 *
+	 * @param cache
+	 * 		BLOB server
+	 * @param jobId
+	 * 		job ID or <tt>null</tt> if job-unrelated
+	 * @param keys
+	 * 		keys identifying BLOBs which were previously registered for the <tt>jobId</tt>
+	 */
+	static void verifyJobCleanup(PermanentBlobCache cache, JobID jobId, List<? extends BlobKey> keys)
+		throws InterruptedException, IOException {
+		// because we cannot guarantee that there are not thread races in the build system, we
+		// loop for a certain while until the references disappear
+		{
+			long deadline = System.currentTimeMillis() + 30_000L;
+			do {
+				Thread.sleep(100);
+			}
+			while (checkFilesExist(jobId, keys, cache, false) != 0 &&
+				System.currentTimeMillis() < deadline);
+		}
+
+		// the blob cache should no longer contain the files
+		// this fails if we exited via a timeout
+		checkFileCountForJob(0, jobId, cache);
+	}
+
+	/**
 	 * Checks how many of the files given by blob keys are accessible.
 	 *
 	 * @param jobId
@@ -317,11 +320,12 @@ public class BlobCacheCleanupTest extends TestLogger {
 	 * @param doThrow
 	 * 		whether exceptions should be ignored (<tt>false</tt>), or thrown (<tt>true</tt>)
 	 *
-	 * @return number of files we were able to retrieve via {@link PermanentBlobService#getHAFile}
+	 * @return number of files existing at {@link BlobServer#getStorageLocation(JobID, BlobKey)} and
+	 * {@link PermanentBlobCache#getStorageLocation(JobID, BlobKey)}, respectively
 	 */
 	public static int checkFilesExist(
-		JobID jobId, Collection<BlobKey> keys, PermanentBlobService blobService, boolean doThrow)
-		throws IOException {
+			JobID jobId, Collection<? extends BlobKey> keys, PermanentBlobService blobService, boolean doThrow)
+			throws IOException {
 
 		int numFiles = 0;
 
@@ -361,10 +365,10 @@ public class BlobCacheCleanupTest extends TestLogger {
 		final File jobDir;
 		if (blobService instanceof BlobServer) {
 			BlobServer server = (BlobServer) blobService;
-			jobDir = server.getStorageLocation(jobId, new BlobKey()).getParentFile();
+			jobDir = server.getStorageLocation(jobId, new PermanentBlobKey()).getParentFile();
 		} else {
 			PermanentBlobCache cache = (PermanentBlobCache) blobService;
-			jobDir = cache.getStorageLocation(jobId, new BlobKey()).getParentFile();
+			jobDir = cache.getStorageLocation(jobId, new PermanentBlobKey()).getParentFile();
 		}
 		File[] blobsForJob = jobDir.listFiles();
 		if (blobsForJob == null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCorruptionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCorruptionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCorruptionTest.java
index 0a2882d..a0e46ba 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCorruptionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCorruptionTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.blob;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.util.TestLogger;
 
@@ -32,6 +31,8 @@ import org.junit.Test;
 import org.junit.rules.ExpectedException;
 import org.junit.rules.TemporaryFolder;
 
+import javax.annotation.Nullable;
+
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -39,6 +40,8 @@ import java.nio.file.Files;
 import java.util.Arrays;
 import java.util.Random;
 
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
 import static org.apache.flink.runtime.blob.BlobServerGetTest.get;
 import static org.apache.flink.runtime.blob.BlobServerPutTest.put;
 import static org.hamcrest.CoreMatchers.containsString;
@@ -48,7 +51,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 /**
- * Tests how GET requests react to corrupt files when downloaded via a {@link BlobCache}.
+ * Tests how GET requests react to corrupt files when downloaded via a {@link BlobCacheService}.
  *
  * <p>Successful GET requests are tested in conjunction wit the PUT requests.
  */
@@ -62,22 +65,22 @@ public class BlobCacheCorruptionTest extends TestLogger {
 
 	@Test
 	public void testGetFailsFromCorruptFile1() throws IOException {
-		testGetFailsFromCorruptFile(null, false, false);
+		testGetFailsFromCorruptFile(null, TRANSIENT_BLOB, false);
 	}
 
 	@Test
 	public void testGetFailsFromCorruptFile2() throws IOException {
-		testGetFailsFromCorruptFile(new JobID(), false, false);
+		testGetFailsFromCorruptFile(new JobID(), TRANSIENT_BLOB, false);
 	}
 
 	@Test
 	public void testGetFailsFromCorruptFile3() throws IOException {
-		testGetFailsFromCorruptFile(new JobID(), true, false);
+		testGetFailsFromCorruptFile(new JobID(), PERMANENT_BLOB, false);
 	}
 
 	@Test
 	public void testGetFailsFromCorruptFile4() throws IOException {
-		testGetFailsFromCorruptFile(new JobID(), true, true);
+		testGetFailsFromCorruptFile(new JobID(), PERMANENT_BLOB, true);
 	}
 
 	/**
@@ -86,19 +89,18 @@ public class BlobCacheCorruptionTest extends TestLogger {
 	 *
 	 * @param jobId
 	 * 		job ID or <tt>null</tt> if job-unrelated
-	 * @param highAvailability
-	 * 		whether to use HA mode accessors
+	 * @param blobType
+	 * 		whether the BLOB should become permanent or transient
 	 * @param corruptOnHAStore
 	 * 		whether the file should be corrupt in the HA store (<tt>true</tt>, required
 	 * 		<tt>highAvailability</tt> to be set) or on the {@link BlobServer}'s local store
 	 * 		(<tt>false</tt>)
 	 */
-	private void testGetFailsFromCorruptFile(final JobID jobId, boolean highAvailability,
+	private void testGetFailsFromCorruptFile(final JobID jobId, BlobKey.BlobType blobType,
 			boolean corruptOnHAStore) throws IOException {
 
 		final Configuration config = new Configuration();
 		config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
-		config.setString(CoreOptions.STATE_BACKEND, "FILESYSTEM");
 		config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
 		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().getPath());
 
@@ -107,7 +109,7 @@ public class BlobCacheCorruptionTest extends TestLogger {
 		try {
 			blobStoreService = BlobUtils.createBlobStoreFromConfig(config);
 
-			testGetFailsFromCorruptFile(jobId, highAvailability, corruptOnHAStore, config,
+			testGetFailsFromCorruptFile(jobId, blobType, corruptOnHAStore, config,
 				blobStoreService, exception);
 		} finally {
 			if (blobStoreService != null) {
@@ -117,13 +119,36 @@ public class BlobCacheCorruptionTest extends TestLogger {
 	}
 
 	/**
+	 * Checks the GET operation fails when the downloaded file (from HA store)
+	 * is corrupt, i.e. its content's hash does not match the {@link BlobKey}'s hash, using a
+	 * permanent BLOB.
+	 *
+	 * @param jobId
+	 * 		job ID
+	 * @param config
+	 * 		blob server configuration (including HA settings like {@link HighAvailabilityOptions#HA_STORAGE_PATH}
+	 * 		and {@link HighAvailabilityOptions#HA_CLUSTER_ID}) used to set up <tt>blobStore</tt>
+	 * @param blobStore
+	 * 		shared HA blob store to use
+	 * @param expectedException
+	 * 		expected exception rule to use
+	 */
+	public static void testGetFailsFromCorruptFile(
+			JobID jobId, Configuration config, BlobStore blobStore,
+			ExpectedException expectedException) throws IOException {
+
+		testGetFailsFromCorruptFile(jobId, PERMANENT_BLOB, true, config, blobStore,
+			expectedException);
+	}
+
+	/**
 	 * Checks the GET operation fails when the downloaded file (from {@link BlobServer} or HA store)
 	 * is corrupt, i.e. its content's hash does not match the {@link BlobKey}'s hash.
 	 *
 	 * @param jobId
 	 * 		job ID or <tt>null</tt> if job-unrelated
-	 * @param highAvailability
-	 * 		whether to use HA mode accessors
+	 * @param blobType
+	 * 		whether the BLOB should become permanent or transient
 	 * @param corruptOnHAStore
 	 * 		whether the file should be corrupt in the HA store (<tt>true</tt>, required
 	 * 		<tt>highAvailability</tt> to be set) or on the {@link BlobServer}'s local store
@@ -136,17 +161,19 @@ public class BlobCacheCorruptionTest extends TestLogger {
 	 * @param expectedException
 	 * 		expected exception rule to use
 	 */
-	public static void testGetFailsFromCorruptFile(
-			JobID jobId, boolean highAvailability, boolean corruptOnHAStore, Configuration config,
-			BlobStore blobStore, ExpectedException expectedException) throws IOException {
+	private static void testGetFailsFromCorruptFile(
+			@Nullable JobID jobId, BlobKey.BlobType blobType, boolean corruptOnHAStore,
+			Configuration config, BlobStore blobStore, ExpectedException expectedException)
+			throws IOException {
 
-		assertTrue("corrupt HA file requires a HA setup", !corruptOnHAStore || highAvailability);
+		assertTrue("corrupt HA file requires a HA setup",
+			!corruptOnHAStore || blobType == PERMANENT_BLOB);
 
 		Random rnd = new Random();
 
 		try (
 			BlobServer server = new BlobServer(config, blobStore);
-			BlobCache cache = new BlobCache(new InetSocketAddress("localhost", server.getPort()),
+			BlobCacheService cache = new BlobCacheService(new InetSocketAddress("localhost", server.getPort()),
 				config, corruptOnHAStore ? blobStore : new VoidBlobStore())) {
 
 			server.start();
@@ -155,7 +182,7 @@ public class BlobCacheCorruptionTest extends TestLogger {
 			rnd.nextBytes(data);
 
 			// put content addressable (like libraries)
-			BlobKey key = put(server, jobId, data, highAvailability);
+			BlobKey key = put(server, jobId, data, blobType);
 			assertNotNull(key);
 
 			// change server/HA store file contents to make sure that GET requests fail
@@ -186,7 +213,7 @@ public class BlobCacheCorruptionTest extends TestLogger {
 			expectedException.expectCause(CoreMatchers.allOf(instanceOf(IOException.class),
 				hasProperty("message", containsString("data corruption"))));
 
-			get(cache, jobId, key, highAvailability);
+			get(cache, jobId, key);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheDeleteTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheDeleteTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheDeleteTest.java
index 20b0152..8ea2a5e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheDeleteTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheDeleteTest.java
@@ -45,6 +45,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
 import static org.apache.flink.runtime.blob.BlobServerDeleteTest.delete;
 import static org.apache.flink.runtime.blob.BlobServerGetTest.verifyDeleted;
 import static org.apache.flink.runtime.blob.BlobServerPutTest.put;
@@ -67,42 +68,43 @@ public class BlobCacheDeleteTest extends TestLogger {
 
 	@Test
 	public void testDeleteTransient1() throws IOException {
-		testDeleteTransient(null, new JobID());
+		testDelete(null, new JobID());
 	}
 
 	@Test
 	public void testDeleteTransient2() throws IOException {
-		testDeleteTransient(new JobID(), null);
+		testDelete(new JobID(), null);
 	}
 
 	@Test
 	public void testDeleteTransient3() throws IOException {
-		testDeleteTransient(null, null);
+		testDelete(null, null);
 	}
 
 	@Test
 	public void testDeleteTransient4() throws IOException {
-		testDeleteTransient(new JobID(), new JobID());
+		testDelete(new JobID(), new JobID());
 	}
 
 	/**
 	 * Uploads a (different) byte array for each of the given jobs and verifies that deleting one of
-	 * them (via the {@link BlobCache}) does not influence the other.
+	 * them (via the {@link BlobCacheService}) does not influence the other.
 	 *
-	 * @param jobId1
+	 *  @param jobId1
 	 * 		first job id
 	 * @param jobId2
 	 * 		second job id
 	 */
-	private void testDeleteTransient(@Nullable JobID jobId1, @Nullable JobID jobId2)
+	private void testDelete(@Nullable JobID jobId1, @Nullable JobID jobId2)
 			throws IOException {
+		final boolean sameJobId = (jobId1 == jobId2) || (jobId1 != null && jobId1.equals(jobId2));
 
 		final Configuration config = new Configuration();
 		config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
 
 		try (
 			BlobServer server = new BlobServer(config, new VoidBlobStore());
-			BlobCache cache = new BlobCache(new InetSocketAddress("localhost", server.getPort()),
+			BlobCacheService cache = new BlobCacheService(new InetSocketAddress("localhost", server.getPort()),
 				config, new VoidBlobStore())) {
 
 			server.start();
@@ -113,38 +115,46 @@ public class BlobCacheDeleteTest extends TestLogger {
 			data2[0] ^= 1;
 
 			// put first BLOB
-			BlobKey key1 = put(server, jobId1, data, false);
+			TransientBlobKey key1 = (TransientBlobKey) put(server, jobId1, data, TRANSIENT_BLOB);
 			assertNotNull(key1);
 
 			// put two more BLOBs (same key, other key) for another job ID
-			BlobKey key2a = put(server, jobId2, data, false);
+			TransientBlobKey key2a = (TransientBlobKey) put(server, jobId2, data, TRANSIENT_BLOB);
 			assertNotNull(key2a);
 			assertEquals(key1, key2a);
-			BlobKey key2b = put(server, jobId2, data2, false);
+			TransientBlobKey key2b = (TransientBlobKey) put(server, jobId2, data2, TRANSIENT_BLOB);
 			assertNotNull(key2b);
 
 			// issue a DELETE request
 			assertTrue(delete(cache, jobId1, key1));
 
-			verifyDeleted(cache, jobId1, key1, false);
-			verifyDeleted(server, jobId1, key1, false);
+			// delete only works on local cache!
+			assertTrue(server.getStorageLocation(jobId1, key1).exists());
+			// delete on server so that the cache cannot re-download
+			assertTrue(server.deleteInternal(jobId1, key1));
+			verifyDeleted(cache, jobId1, key1);
 			// deleting a one BLOB should not affect another BLOB, even with the same key if job IDs are different
-			if ((jobId1 == null && jobId2 != null) || (jobId1 != null && !jobId1.equals(jobId2))) {
-				verifyContents(server, jobId2, key2a, data, false);
+			if (!sameJobId) {
+				verifyContents(server, jobId2, key2a, data);
 			}
-			verifyContents(server, jobId2, key2b, data2, false);
+			verifyContents(server, jobId2, key2b, data2);
 
 			// delete first file of second job
 			assertTrue(delete(cache, jobId2, key2a));
-			verifyDeleted(cache, jobId2, key2a, false);
-			verifyDeleted(server, jobId2, key2a, false);
-			verifyContents(server, jobId2, key2b, data2, false);
+			// delete only works on local cache (unless already deleted - key1 == key2a)!
+			assertTrue(sameJobId || server.getStorageLocation(jobId2, key2a).exists());
+			// delete on server so that the cache cannot re-download
+			assertTrue(server.deleteInternal(jobId2, key2a));
+			verifyDeleted(cache, jobId2, key2a);
+			verifyContents(server, jobId2, key2b, data2);
 
 			// delete second file of second job
-			assertTrue(delete(cache, jobId2, key2a));
-			verifyDeleted(cache, jobId2, key2a, false);
-			verifyDeleted(server, jobId2, key2a, false);
-			verifyContents(server, jobId2, key2b, data2, false);
+			assertTrue(delete(cache, jobId2, key2b));
+			// delete only works on local cache (unless already deleted - key1 == key2a)!
+			assertTrue(server.getStorageLocation(jobId2, key2b).exists());
+			// delete on server so that the cache cannot re-download
+			assertTrue(server.deleteInternal(jobId2, key2b));
+			verifyDeleted(cache, jobId2, key2b);
 		}
 	}
 
@@ -160,7 +170,7 @@ public class BlobCacheDeleteTest extends TestLogger {
 
 	/**
 	 * Uploads a byte array for the given job and verifies that deleting it (via the {@link
-	 * BlobCache}) does not fail independent of whether the file exists.
+	 * BlobCacheService}) does not fail independent of whether the file exists.
 	 *
 	 * @param jobId
 	 * 		job id
@@ -172,7 +182,7 @@ public class BlobCacheDeleteTest extends TestLogger {
 
 		try (
 			BlobServer server = new BlobServer(config, new VoidBlobStore());
-			BlobCache cache = new BlobCache(new InetSocketAddress("localhost", server.getPort()),
+			BlobCacheService cache = new BlobCacheService(new InetSocketAddress("localhost", server.getPort()),
 				config, new VoidBlobStore())) {
 
 			server.start();
@@ -181,7 +191,7 @@ public class BlobCacheDeleteTest extends TestLogger {
 			rnd.nextBytes(data);
 
 			// put BLOB
-			BlobKey key = put(server, jobId, data, false);
+			TransientBlobKey key = (TransientBlobKey) put(server, jobId, data, TRANSIENT_BLOB);
 			assertNotNull(key);
 
 			File blobFile = server.getStorageLocation(jobId, key);
@@ -189,11 +199,11 @@ public class BlobCacheDeleteTest extends TestLogger {
 
 			// DELETE operation should not fail if file is already deleted
 			assertTrue(delete(cache, jobId, key));
-			verifyDeleted(cache, jobId, key, false);
+			verifyDeleted(cache, jobId, key);
 
 			// one more delete call that should not fail
 			assertTrue(delete(cache, jobId, key));
-			verifyDeleted(cache, jobId, key, false);
+			verifyDeleted(cache, jobId, key);
 		}
 	}
 
@@ -209,8 +219,8 @@ public class BlobCacheDeleteTest extends TestLogger {
 
 	/**
 	 * Uploads a byte array for the given job and verifies that a delete operation (via the {@link
-	 * BlobCache}) does not fail even if the file is not deletable locally, e.g. via restricting
-	 * the permissions.
+	 * BlobCacheService}) does not fail even if the file is not deletable locally, e.g. via
+	 * restricting the permissions.
 	 *
 	 * @param jobId
 	 * 		job id
@@ -225,7 +235,7 @@ public class BlobCacheDeleteTest extends TestLogger {
 		File directory = null;
 		try (
 			BlobServer server = new BlobServer(config, new VoidBlobStore());
-			BlobCache cache = new BlobCache(new InetSocketAddress("localhost", server.getPort()),
+			BlobCacheService cache = new BlobCacheService(new InetSocketAddress("localhost", server.getPort()),
 				config, new VoidBlobStore())) {
 
 			server.start();
@@ -235,13 +245,13 @@ public class BlobCacheDeleteTest extends TestLogger {
 				rnd.nextBytes(data);
 
 				// put BLOB
-				BlobKey key = put(server, jobId, data, false);
+				TransientBlobKey key = (TransientBlobKey) put(server, jobId, data, TRANSIENT_BLOB);
 				assertNotNull(key);
 
 				// access from cache once to have it available there
-				verifyContents(cache, jobId, key, data, false);
+				verifyContents(cache, jobId, key, data);
 
-				blobFile = cache.getTransientBlobStore().getStorageLocation(jobId, key);
+				blobFile = cache.getTransientBlobService().getStorageLocation(jobId, key);
 				directory = blobFile.getParentFile();
 
 				assertTrue(blobFile.setWritable(false, false));
@@ -249,10 +259,10 @@ public class BlobCacheDeleteTest extends TestLogger {
 
 				// issue a DELETE request
 				assertFalse(delete(cache, jobId, key));
-				verifyDeleted(server, jobId, key, false);
+				verifyDeleted(server, jobId, key);
 
 				// the file should still be there on both cache and server
-				verifyContents(cache, jobId, key, data, false);
+				verifyContents(cache, jobId, key, data);
 			} finally {
 				if (blobFile != null && directory != null) {
 					//noinspection ResultOfMethodCallIgnored
@@ -265,83 +275,14 @@ public class BlobCacheDeleteTest extends TestLogger {
 	}
 
 	@Test
-	public void testDeleteTransientRemoteFailsNoJob() throws IOException {
-		testDeleteTransientRemoteFails(null);
-	}
-
-	@Test
-	public void testDeleteTransientRemoteFailsForJob() throws IOException {
-		testDeleteTransientRemoteFails(new JobID());
-	}
-
-	/**
-	 * Uploads a byte array for the given job and verifies that a delete operation (via the {@link
-	 * BlobCache}) does not fail even if the file is not deletable on the {@link BlobServer}, e.g.
-	 * via restricting the permissions.
-	 *
-	 * @param jobId
-	 * 		job id
-	 */
-	private void testDeleteTransientRemoteFails(@Nullable final JobID jobId) throws IOException {
-		assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't work on Windows.
-
-		final Configuration config = new Configuration();
-		config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
-
-		File blobFile = null;
-		File directory = null;
-
-		try (
-			BlobServer server = new BlobServer(config, new VoidBlobStore());
-			BlobCache cache = new BlobCache(new InetSocketAddress("localhost", server.getPort()),
-				config, new VoidBlobStore())) {
-
-			server.start();
-
-			try {
-				byte[] data = new byte[2000000];
-				rnd.nextBytes(data);
-
-				// put BLOB
-				BlobKey key = put(server, jobId, data, false);
-				assertNotNull(key);
-
-				// access from cache once to have it available there
-				verifyContents(cache, jobId, key, data, false);
-
-				blobFile = server.getStorageLocation(jobId, key);
-				directory = blobFile.getParentFile();
-
-				assertTrue(blobFile.setWritable(false, false));
-				assertTrue(directory.setWritable(false, false));
-
-				// issue a DELETE request
-				assertFalse(delete(cache, jobId, key));
-				File blobFileAtCache = cache.getTransientBlobStore().getStorageLocation(jobId, key);
-				assertFalse(blobFileAtCache.exists());
-
-				// the file should still be there on the server
-				verifyContents(server, jobId, key, data, false);
-				// ... and may be retrieved by the cache
-				verifyContents(cache, jobId, key, data, false);
-			} finally {
-				if (blobFile != null && directory != null) {
-					//noinspection ResultOfMethodCallIgnored
-					blobFile.setWritable(true, false);
-					//noinspection ResultOfMethodCallIgnored
-					directory.setWritable(true, false);
-				}
-			}
-		}
-	}
-
-	@Test
-	public void testConcurrentDeleteOperationsNoJob() throws IOException, ExecutionException, InterruptedException {
+	public void testConcurrentDeleteOperationsNoJobTransient()
+			throws IOException, ExecutionException, InterruptedException {
 		testConcurrentDeleteOperations(null);
 	}
 
 	@Test
-	public void testConcurrentDeleteOperationsForJob() throws IOException, ExecutionException, InterruptedException {
+	public void testConcurrentDeleteOperationsForJobTransient()
+			throws IOException, ExecutionException, InterruptedException {
 		testConcurrentDeleteOperations(new JobID());
 	}
 
@@ -352,9 +293,9 @@ public class BlobCacheDeleteTest extends TestLogger {
 	 * blob file exist and then one of them fails deleting it. Without the introduced lock, this
 	 * situation should rarely happen and make this test fail. Thus, if this test should become
 	 * "unstable", then the delete atomicity is most likely broken.
-	 *
-	 * @param jobId
+	 *  @param jobId
 	 * 		job ID to use (or <tt>null</tt> if job-unrelated)
+	 *
 	 */
 	private void testConcurrentDeleteOperations(@Nullable final JobID jobId)
 			throws IOException, InterruptedException, ExecutionException {
@@ -371,12 +312,13 @@ public class BlobCacheDeleteTest extends TestLogger {
 
 		try (
 			BlobServer server = new BlobServer(config, new VoidBlobStore());
-			BlobCache cache = new BlobCache(new InetSocketAddress("localhost", server.getPort()),
+			BlobCacheService cache = new BlobCacheService(new InetSocketAddress("localhost", server.getPort()),
 				config, new VoidBlobStore())) {
 
 			server.start();
 
-			final BlobKey blobKey = put(server, jobId, data, false);
+			final TransientBlobKey blobKey =
+				(TransientBlobKey) put(server, jobId, data, TRANSIENT_BLOB);
 
 			assertTrue(server.getStorageLocation(jobId, blobKey).exists());
 
@@ -386,8 +328,9 @@ public class BlobCacheDeleteTest extends TestLogger {
 						() -> {
 							try {
 								assertTrue(delete(cache, jobId, blobKey));
-								assertFalse(cache.getTransientBlobStore().getStorageLocation(jobId, blobKey).exists());
-								assertFalse(server.getStorageLocation(jobId, blobKey).exists());
+								assertFalse(cache.getTransientBlobService().getStorageLocation(jobId, blobKey).exists());
+								// delete only works on local cache!
+								assertTrue(server.getStorageLocation(jobId, blobKey).exists());
 								return null;
 							} catch (IOException e) {
 								throw new CompletionException(new FlinkException(
@@ -404,7 +347,8 @@ public class BlobCacheDeleteTest extends TestLogger {
 			// in case of no lock, one of the delete operations should eventually fail
 			waitFuture.get();
 
-			assertFalse(server.getStorageLocation(jobId, blobKey).exists());
+			// delete only works on local cache!
+			assertTrue(server.getStorageLocation(jobId, blobKey).exists());
 
 		} finally {
 			executor.shutdownNow();

http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheGetTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheGetTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheGetTest.java
index 52adc4d..bed27d8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheGetTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheGetTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.OperatingSystem;
 import org.apache.flink.util.TestLogger;
 
+import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -35,6 +36,7 @@ import javax.annotation.Nullable;
 
 import java.io.File;
 import java.io.FileInputStream;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.file.AccessDeniedException;
@@ -51,24 +53,37 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
+import static org.apache.flink.runtime.blob.BlobCachePutTest.verifyDeletedEventually;
 import static org.apache.flink.runtime.blob.BlobClientTest.validateGetAndClose;
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
+import static org.apache.flink.runtime.blob.BlobKeyTest.verifyType;
+import static org.apache.flink.runtime.blob.BlobServerDeleteTest.delete;
 import static org.apache.flink.runtime.blob.BlobServerGetTest.get;
 import static org.apache.flink.runtime.blob.BlobServerGetTest.verifyDeleted;
 import static org.apache.flink.runtime.blob.BlobServerPutTest.put;
+import static org.apache.flink.runtime.blob.BlobServerPutTest.verifyContents;
 import static org.apache.flink.runtime.blob.BlobUtils.JOB_DIR_PREFIX;
 import static org.apache.flink.runtime.blob.BlobUtils.NO_JOB_DIR_PREFIX;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assume.assumeTrue;
 import static org.mockito.Mockito.mock;
 
 /**
- * Tests how failing GET requests behave in the presence of failures when used with a {@link
- * BlobCache}.
+ * Tests for GET-specific parts of the {@link BlobCacheService}.
  *
- * <p>Successful GET requests are tested in conjunction wit the PUT requests.
+ * This includes access of transient BLOBs from the {@link PermanentBlobCache}, permanent BLOBS from
+ * the {@link TransientBlobCache}, and how failing GET requests behave in the presence of failures
+ * when used with a {@link BlobCacheService}.
+ *
+ * <p>Most successful GET requests are tested in conjunction wit the PUT requests by {@link
+ * BlobCachePutTest}.
  */
 public class BlobCacheGetTest extends TestLogger {
 
@@ -81,23 +96,23 @@ public class BlobCacheGetTest extends TestLogger {
 	public final ExpectedException exception = ExpectedException.none();
 
 	@Test
-	public void testGetFailsDuringLookup1() throws IOException {
-		testGetFailsDuringLookup(null, new JobID(), false);
+	public void testGetTransientFailsDuringLookup1() throws IOException, InterruptedException {
+		testGetFailsDuringLookup(null, new JobID(), TRANSIENT_BLOB);
 	}
 
 	@Test
-	public void testGetFailsDuringLookup2() throws IOException {
-		testGetFailsDuringLookup(new JobID(), new JobID(), false);
+	public void testGetTransientFailsDuringLookup2() throws IOException, InterruptedException {
+		testGetFailsDuringLookup(new JobID(), new JobID(), TRANSIENT_BLOB);
 	}
 
 	@Test
-	public void testGetFailsDuringLookup3() throws IOException {
-		testGetFailsDuringLookup(new JobID(), null, false);
+	public void testGetTransientFailsDuringLookup3() throws IOException, InterruptedException {
+		testGetFailsDuringLookup(new JobID(), null, TRANSIENT_BLOB);
 	}
 
 	@Test
-	public void testGetFailsDuringLookupHa() throws IOException {
-		testGetFailsDuringLookup(new JobID(), new JobID(), true);
+	public void testGetFailsDuringLookupHa() throws IOException, InterruptedException {
+		testGetFailsDuringLookup(new JobID(), new JobID(), PERMANENT_BLOB);
 	}
 
 	/**
@@ -105,15 +120,17 @@ public class BlobCacheGetTest extends TestLogger {
 	 *
 	 * @param jobId1 first job ID or <tt>null</tt> if job-unrelated
 	 * @param jobId2 second job ID different to <tt>jobId1</tt>
+	 * @param blobType
+	 * 		whether the BLOB should become permanent or transient
 	 */
-	private void testGetFailsDuringLookup(final JobID jobId1, final JobID jobId2, boolean highAvailability)
-		throws IOException {
+	private void testGetFailsDuringLookup(final JobID jobId1, final JobID jobId2, BlobKey.BlobType blobType)
+			throws IOException, InterruptedException {
 		final Configuration config = new Configuration();
 		config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
 
 		try (
 			BlobServer server = new BlobServer(config, new VoidBlobStore());
-			BlobCache cache = new BlobCache(new InetSocketAddress("localhost", server.getPort()),
+			BlobCacheService cache = new BlobCacheService(new InetSocketAddress("localhost", server.getPort()),
 				config, new VoidBlobStore())) {
 
 			server.start();
@@ -122,75 +139,79 @@ public class BlobCacheGetTest extends TestLogger {
 			rnd.nextBytes(data);
 
 			// put content addressable (like libraries)
-			BlobKey key = put(server, jobId1, data, highAvailability);
+			BlobKey key = put(server, jobId1, data, blobType);
 			assertNotNull(key);
+			verifyType(blobType, key);
 
 			// delete file to make sure that GET requests fail
 			File blobFile = server.getStorageLocation(jobId1, key);
 			assertTrue(blobFile.delete());
 
 			// issue a GET request that fails
-			verifyDeleted(cache, jobId1, key, highAvailability);
+			verifyDeleted(cache, jobId1, key);
 
 			// add the same data under a second jobId
-			BlobKey key2 = put(server, jobId2, data, highAvailability);
+			BlobKey key2 = put(server, jobId2, data, blobType);
 			assertNotNull(key);
 			assertEquals(key, key2);
 
 			// request for jobId2 should succeed
-			get(cache, jobId2, key, highAvailability);
+			get(cache, jobId2, key);
 			// request for jobId1 should still fail
-			verifyDeleted(cache, jobId1, key, highAvailability);
-
-			// delete on cache, try to retrieve again
-			if (highAvailability) {
-				blobFile = cache.getPermanentBlobStore().getStorageLocation(jobId2, key);
-			} else {
-				blobFile = cache.getTransientBlobStore().getStorageLocation(jobId2, key);
-			}
-			assertTrue(blobFile.delete());
-			get(cache, jobId2, key, highAvailability);
-
-			// delete on cache and server, verify that it is not accessible anymore
-			if (highAvailability) {
-				blobFile = cache.getPermanentBlobStore().getStorageLocation(jobId2, key);
+			verifyDeleted(cache, jobId1, key);
+
+			if (blobType == PERMANENT_BLOB) {
+				// still existing on server
+				assertTrue(server.getStorageLocation(jobId2, key).exists());
+				// delete jobId2 on cache
+				blobFile = cache.getPermanentBlobService().getStorageLocation(jobId2, key);
+				assertTrue(blobFile.delete());
+				// try to retrieve again
+				get(cache, jobId2, key);
+
+				// delete on cache and server, verify that it is not accessible anymore
+				blobFile = cache.getPermanentBlobService().getStorageLocation(jobId2, key);
+				assertTrue(blobFile.delete());
+				blobFile = server.getStorageLocation(jobId2, key);
+				assertTrue(blobFile.delete());
+				verifyDeleted(cache, jobId2, key);
 			} else {
-				blobFile = cache.getTransientBlobStore().getStorageLocation(jobId2, key);
+				// deleted eventually on the server by the GET request above
+				verifyDeletedEventually(server, jobId2, key);
+				// delete jobId2 on cache
+				blobFile = cache.getTransientBlobService().getStorageLocation(jobId2, key);
+				assertTrue(blobFile.delete());
+				// verify that it is not accessible anymore
+				verifyDeleted(cache, jobId2, key);
 			}
-			assertTrue(blobFile.delete());
-			blobFile = server.getStorageLocation(jobId2, key);
-			assertTrue(blobFile.delete());
-			verifyDeleted(cache, jobId2, key, highAvailability);
 		}
 	}
 
 	@Test
 	public void testGetFailsIncomingNoJob() throws IOException {
-		testGetFailsIncoming(null, false);
+		testGetFailsIncoming(null, TRANSIENT_BLOB);
 	}
 
 	@Test
 	public void testGetFailsIncomingForJob() throws IOException {
-		testGetFailsIncoming(new JobID(), false);
+		testGetFailsIncoming(new JobID(), TRANSIENT_BLOB);
 	}
 
 	@Test
 	public void testGetFailsIncomingForJobHa() throws IOException {
-		testGetFailsIncoming(new JobID(), true);
+		testGetFailsIncoming(new JobID(), PERMANENT_BLOB);
 	}
 
 	/**
-	 * Retrieves a BLOB via a {@link BlobCache} which cannot create incoming files. File transfers
-	 * should fail.
+	 * Retrieves a BLOB via a {@link BlobCacheService} which cannot create incoming files. File
+	 * transfers should fail.
 	 *
 	 * @param jobId
 	 * 		job id
-	 * @param highAvailability
-	 * 		whether to retrieve a permanent blob (<tt>true</tt>, via {@link
-	 * 		PermanentBlobCache#getHAFile(JobID, BlobKey)}) or not (<tt>false</tt>, via {@link
-	 * 		TransientBlobCache#getFile(JobID, BlobKey)})
+	 * @param blobType
+	 * 		whether the BLOB should become permanent or transient
 	 */
-	private void testGetFailsIncoming(@Nullable final JobID jobId, boolean highAvailability)
+	private void testGetFailsIncoming(@Nullable final JobID jobId, BlobKey.BlobType blobType)
 			throws IOException {
 		assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't work on Windows.
 
@@ -200,7 +221,7 @@ public class BlobCacheGetTest extends TestLogger {
 		File tempFileDir = null;
 		try (
 			BlobServer server = new BlobServer(config, new VoidBlobStore());
-			BlobCache cache = new BlobCache(new InetSocketAddress("localhost", server.getPort()),
+			BlobCacheService cache = new BlobCacheService(new InetSocketAddress("localhost", server.getPort()),
 				config, new VoidBlobStore())) {
 
 			server.start();
@@ -208,15 +229,16 @@ public class BlobCacheGetTest extends TestLogger {
 			// store the data on the server
 			byte[] data = new byte[2000000];
 			rnd.nextBytes(data);
-			BlobKey blobKey = put(server, jobId, data, highAvailability);
+			BlobKey blobKey = put(server, jobId, data, blobType);
+			verifyType(blobType, blobKey);
 
 			// make sure the blob cache cannot create any files in its storage dir
-			if (highAvailability) {
+			if (blobType == PERMANENT_BLOB) {
 				tempFileDir =
-					cache.getPermanentBlobStore().createTemporaryFilename().getParentFile();
+					cache.getPermanentBlobService().createTemporaryFilename().getParentFile();
 			} else {
 				tempFileDir =
-					cache.getTransientBlobStore().createTemporaryFilename().getParentFile();
+					cache.getTransientBlobService().createTemporaryFilename().getParentFile();
 			}
 			assertTrue(tempFileDir.setExecutable(true, false));
 			assertTrue(tempFileDir.setReadable(true, false));
@@ -227,7 +249,7 @@ public class BlobCacheGetTest extends TestLogger {
 			exception.expectMessage("Failed to fetch BLOB ");
 
 			try {
-				get(cache, jobId, blobKey, highAvailability);
+				get(cache, jobId, blobKey);
 			} finally {
 				HashSet<String> expectedDirs = new HashSet<>();
 				expectedDirs.add("incoming");
@@ -254,6 +276,9 @@ public class BlobCacheGetTest extends TestLogger {
 					File noJobDir = new File(tempFileDir.getParentFile(), NO_JOB_DIR_PREFIX);
 					assertArrayEquals(new String[] {}, noJobDir.list());
 				}
+
+				// file should still be there on the server (even if transient)
+				assertTrue(server.getStorageLocation(jobId, blobKey).exists());
 			}
 		} finally {
 			// set writable again to make sure we can remove the directory
@@ -265,33 +290,31 @@ public class BlobCacheGetTest extends TestLogger {
 	}
 
 	@Test
-	public void testGetFailsStoreNoJob() throws IOException {
-		testGetFailsStore(null, false);
+	public void testGetTransientFailsStoreNoJob() throws IOException, InterruptedException {
+		testGetFailsStore(null, TRANSIENT_BLOB);
 	}
 
 	@Test
-	public void testGetFailsStoreForJob() throws IOException {
-		testGetFailsStore(new JobID(), false);
+	public void testGetTransientFailsStoreForJob() throws IOException, InterruptedException {
+		testGetFailsStore(new JobID(), TRANSIENT_BLOB);
 	}
 
 	@Test
-	public void testGetFailsStoreForJobHa() throws IOException {
-		testGetFailsStore(new JobID(), true);
+	public void testGetPermanentFailsStoreForJob() throws IOException, InterruptedException {
+		testGetFailsStore(new JobID(), PERMANENT_BLOB);
 	}
 
 	/**
-	 * Retrieves a BLOB via a {@link BlobCache} which cannot create the final storage file. File
-	 * transfers should fail.
+	 * Retrieves a BLOB via a {@link BlobCacheService} which cannot create the final storage file.
+	 * File transfers should fail.
 	 *
 	 * @param jobId
 	 * 		job id
-	 * @param highAvailability
-	 * 		whether to retrieve a permanent blob (<tt>true</tt>, via {@link
-	 * 		PermanentBlobCache#getHAFile(JobID, BlobKey)}) or not (<tt>false</tt>, via {@link
-	 * 		TransientBlobCache#getFile(JobID, BlobKey)})
+	 * @param blobType
+	 * 		whether the BLOB should become permanent or transient
 	 */
-	private void testGetFailsStore(@Nullable final JobID jobId, boolean highAvailability)
-			throws IOException {
+	private void testGetFailsStore(@Nullable final JobID jobId, BlobKey.BlobType blobType)
+			throws IOException, InterruptedException {
 		assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't work on Windows.
 
 		final Configuration config = new Configuration();
@@ -300,7 +323,7 @@ public class BlobCacheGetTest extends TestLogger {
 		File jobStoreDir = null;
 		try (
 			BlobServer server = new BlobServer(config, new VoidBlobStore());
-			BlobCache cache = new BlobCache(new InetSocketAddress("localhost", server.getPort()),
+			BlobCacheService cache = new BlobCacheService(new InetSocketAddress("localhost", server.getPort()),
 				config, new VoidBlobStore())) {
 
 			server.start();
@@ -308,14 +331,17 @@ public class BlobCacheGetTest extends TestLogger {
 			// store the data on the server
 			byte[] data = new byte[2000000];
 			rnd.nextBytes(data);
-			BlobKey blobKey = put(server, jobId, data, highAvailability);
+			BlobKey blobKey = put(server, jobId, data, blobType);
+			verifyType(blobType, blobKey);
 
 			// make sure the blob cache cannot create any files in its storage dir
-			if (highAvailability) {
-				jobStoreDir = cache.getPermanentBlobStore().getStorageLocation(jobId, new BlobKey())
+			if (blobType == PERMANENT_BLOB) {
+				jobStoreDir = cache.getPermanentBlobService()
+					.getStorageLocation(jobId, new PermanentBlobKey())
 					.getParentFile();
 			} else {
-				jobStoreDir = cache.getTransientBlobStore().getStorageLocation(jobId, new BlobKey())
+				jobStoreDir = cache.getTransientBlobService()
+					.getStorageLocation(jobId, new TransientBlobKey())
 					.getParentFile();
 			}
 			assertTrue(jobStoreDir.setExecutable(true, false));
@@ -326,7 +352,7 @@ public class BlobCacheGetTest extends TestLogger {
 			exception.expect(AccessDeniedException.class);
 
 			try {
-				get(cache, jobId, blobKey, highAvailability);
+				get(cache, jobId, blobKey);
 			} finally {
 				// there should be no remaining incoming files
 				File incomingFileDir = new File(jobStoreDir.getParent(), "incoming");
@@ -334,6 +360,14 @@ public class BlobCacheGetTest extends TestLogger {
 
 				// there should be no files in the job directory
 				assertArrayEquals(new String[] {}, jobStoreDir.list());
+
+				// if transient, the get will fail but since the download was successful, the file
+				// will not be on the server anymore
+				if (blobType == TRANSIENT_BLOB) {
+					verifyDeletedEventually(server, jobId, blobKey);
+				} else {
+					assertTrue(server.getStorageLocation(jobId, blobKey).exists());
+				}
 			}
 		} finally {
 			// set writable again to make sure we can remove the directory
@@ -357,7 +391,7 @@ public class BlobCacheGetTest extends TestLogger {
 
 		try (
 			BlobServer server = new BlobServer(config, new VoidBlobStore());
-			BlobCache cache = new BlobCache(new InetSocketAddress("localhost", server.getPort()),
+			BlobCacheService cache = new BlobCacheService(new InetSocketAddress("localhost", server.getPort()),
 				config, new VoidBlobStore())) {
 
 			server.start();
@@ -365,7 +399,7 @@ public class BlobCacheGetTest extends TestLogger {
 			// store the data on the server (and blobStore), remove from local server store
 			byte[] data = new byte[2000000];
 			rnd.nextBytes(data);
-			BlobKey blobKey = put(server, jobId, data, true);
+			PermanentBlobKey blobKey = (PermanentBlobKey) put(server, jobId, data, PERMANENT_BLOB);
 			assertTrue(server.getStorageLocation(jobId, blobKey).delete());
 
 			File tempFileDir = server.createTemporaryFilename().getParentFile();
@@ -375,7 +409,7 @@ public class BlobCacheGetTest extends TestLogger {
 			exception.expectMessage("Failed to fetch BLOB ");
 
 			try {
-				get(cache, jobId, blobKey, true);
+				get(cache, jobId, blobKey);
 			} finally {
 				HashSet<String> expectedDirs = new HashSet<>();
 				expectedDirs.add("incoming");
@@ -393,6 +427,77 @@ public class BlobCacheGetTest extends TestLogger {
 		}
 	}
 
+	@Test
+	public void testGetTransientRemoteDeleteFailsNoJob() throws IOException {
+		testGetTransientRemoteDeleteFails(null);
+	}
+
+	@Test
+	public void testGetTransientRemoteDeleteFailsForJob() throws IOException {
+		testGetTransientRemoteDeleteFails(new JobID());
+	}
+
+	/**
+	 * Uploads a byte array for the given job and verifies that a get operation of a transient BLOB
+	 * (via the {@link BlobCacheService}; also deletes the file on the {@link BlobServer}) does not
+	 * fail even if the file is not deletable on the {@link BlobServer}, e.g. via restricting the
+	 * permissions.
+	 *
+	 * @param jobId
+	 * 		job id
+	 */
+	private void testGetTransientRemoteDeleteFails(@Nullable final JobID jobId) throws IOException {
+		assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't work on Windows.
+
+		final Configuration config = new Configuration();
+		config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
+
+		File blobFile = null;
+		File directory = null;
+
+		try (
+			BlobServer server = new BlobServer(config, new VoidBlobStore());
+			BlobCacheService cache = new BlobCacheService(new InetSocketAddress("localhost", server.getPort()),
+				config, new VoidBlobStore())) {
+
+			server.start();
+
+			try {
+				byte[] data = new byte[2000000];
+				rnd.nextBytes(data);
+
+				// put BLOB
+				TransientBlobKey key = (TransientBlobKey) put(server, jobId, data, TRANSIENT_BLOB);
+				assertNotNull(key);
+
+				blobFile = server.getStorageLocation(jobId, key);
+				directory = blobFile.getParentFile();
+
+				assertTrue(blobFile.setWritable(false, false));
+				assertTrue(directory.setWritable(false, false));
+
+				// access from cache once which also deletes the file on the server
+				verifyContents(cache, jobId, key, data);
+				// delete locally (should not be affected by the server)
+				assertTrue(delete(cache, jobId, key));
+				File blobFileAtCache = cache.getTransientBlobService().getStorageLocation(jobId, key);
+				assertFalse(blobFileAtCache.exists());
+
+				// the file should still be there on the server
+				verifyContents(server, jobId, key, data);
+				// ... and may be retrieved by the cache
+				verifyContents(cache, jobId, key, data);
+			} finally {
+				if (blobFile != null && directory != null) {
+					//noinspection ResultOfMethodCallIgnored
+					blobFile.setWritable(true, false);
+					//noinspection ResultOfMethodCallIgnored
+					directory.setWritable(true, false);
+				}
+			}
+		}
+	}
+
 	/**
 	 * FLINK-6020
 	 *
@@ -400,22 +505,22 @@ public class BlobCacheGetTest extends TestLogger {
 	 */
 	@Test
 	public void testConcurrentGetOperationsNoJob() throws IOException, ExecutionException, InterruptedException {
-		testConcurrentGetOperations(null, false, false);
+		testConcurrentGetOperations(null, TRANSIENT_BLOB, false);
 	}
 
 	@Test
 	public void testConcurrentGetOperationsForJob() throws IOException, ExecutionException, InterruptedException {
-		testConcurrentGetOperations(new JobID(), false, false);
+		testConcurrentGetOperations(new JobID(), TRANSIENT_BLOB, false);
 	}
 
 	@Test
 	public void testConcurrentGetOperationsForJobHa() throws IOException, ExecutionException, InterruptedException {
-		testConcurrentGetOperations(new JobID(), true, false);
+		testConcurrentGetOperations(new JobID(), PERMANENT_BLOB, false);
 	}
 
 	@Test
 	public void testConcurrentGetOperationsForJobHa2() throws IOException, ExecutionException, InterruptedException {
-		testConcurrentGetOperations(new JobID(), true, true);
+		testConcurrentGetOperations(new JobID(), PERMANENT_BLOB, true);
 	}
 
 	/**
@@ -424,12 +529,12 @@ public class BlobCacheGetTest extends TestLogger {
 	 *
 	 * @param jobId
 	 * 		job ID to use (or <tt>null</tt> if job-unrelated)
-	 * @param highAvailability
-	 * 		whether to use permanent (<tt>true</tt>) or transient BLOBs (<tt>false</tt>)
+	 * @param blobType
+	 * 		whether the BLOB should become permanent or transient
 	 * @param cacheAccessesHAStore
 	 * 		whether the cache has access to the {@link BlobServer}'s HA store or not
 	 */
-	private void testConcurrentGetOperations(final JobID jobId, final boolean highAvailability,
+	private void testConcurrentGetOperations(final JobID jobId, final BlobKey.BlobType blobType,
 			final boolean cacheAccessesHAStore)
 			throws IOException, InterruptedException, ExecutionException {
 		final Configuration config = new Configuration();
@@ -446,19 +551,19 @@ public class BlobCacheGetTest extends TestLogger {
 		MessageDigest md = BlobUtils.createMessageDigest();
 
 		// create the correct blob key by hashing our input data
-		final BlobKey blobKey = new BlobKey(md.digest(data));
+		final BlobKey blobKey = BlobKey.createKey(blobType, md.digest(data));
 
 		final ExecutorService executor = Executors.newFixedThreadPool(numberConcurrentGetOperations);
 
 		try (
 			final BlobServer server = new BlobServer(config, blobStoreServer);
-			final BlobCache cache = new BlobCache(new InetSocketAddress("localhost", server.getPort()),
+			final BlobCacheService cache = new BlobCacheService(new InetSocketAddress("localhost", server.getPort()),
 				config, cacheAccessesHAStore ? blobStoreServer : blobStoreCache)) {
 
 			server.start();
 
 			// upload data first
-			assertEquals(blobKey, put(server, jobId, data, highAvailability));
+			assertEquals(blobKey, put(server, jobId, data, blobType));
 
 			// now try accessing it concurrently (only HA mode will be able to retrieve it from HA store!)
 			for (int i = 0; i < numberConcurrentGetOperations; i++) {
@@ -466,13 +571,13 @@ public class BlobCacheGetTest extends TestLogger {
 					.supplyAsync(
 						() -> {
 							try {
-								File file = get(cache, jobId, blobKey, highAvailability);
+								File file = get(cache, jobId, blobKey);
 								// check that we have read the right data
 								validateGetAndClose(new FileInputStream(file), data);
 								return file;
 							} catch (IOException e) {
 								throw new CompletionException(new FlinkException(
-									"Could not upload blob.", e));
+									"Could not read blob for key " + blobKey + '.', e));
 							}
 						}, executor);
 
@@ -481,8 +586,29 @@ public class BlobCacheGetTest extends TestLogger {
 
 			FutureUtils.ConjunctFuture<Collection<File>> filesFuture = FutureUtils.combineAll(getOperations);
 
-			// wait until all operations have completed and check that no exception was thrown
-			filesFuture.get();
+			if (blobType == PERMANENT_BLOB) {
+				// wait until all operations have completed and check that no exception was thrown
+				filesFuture.get();
+			} else {
+				// wait for all futures to complete (do not abort on expected exceptions) and check
+				// that at least one succeeded
+				int completedSuccessfully = 0;
+				for (CompletableFuture<File> op : getOperations) {
+					try {
+						op.get();
+						++completedSuccessfully;
+					} catch (Throwable t) {
+						// transient BLOBs get deleted upon first access and only one request will be successful while all others will have an IOException caused by a FileNotFoundException
+						if (!(ExceptionUtils.getRootCause(t) instanceof FileNotFoundException)) {
+							// ignore
+							org.apache.flink.util.ExceptionUtils.rethrowIOException(t);
+						}
+					}
+				}
+				// multiple clients may have accessed the BLOB successfully before it was
+				// deleted, but always at least one:
+				assertThat(completedSuccessfully, greaterThanOrEqualTo(1));
+			}
 		} finally {
 			executor.shutdownNow();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCachePutTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCachePutTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCachePutTest.java
index 7066b95..aa23c80 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCachePutTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCachePutTest.java
@@ -55,6 +55,11 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.function.Supplier;
 
+import static org.apache.flink.runtime.blob.BlobCacheCleanupTest.checkFilesExist;
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
+import static org.apache.flink.runtime.blob.BlobKeyTest.verifyType;
+import static org.apache.flink.runtime.blob.BlobServerGetTest.verifyDeleted;
 import static org.apache.flink.runtime.blob.BlobServerPutTest.BlockingInputStream;
 import static org.apache.flink.runtime.blob.BlobServerPutTest.ChunkedInputStream;
 import static org.apache.flink.runtime.blob.BlobServerPutTest.put;
@@ -154,7 +159,7 @@ public class BlobCachePutTest extends TestLogger {
 
 			server.start();
 
-			BlobKey key = new BlobKey();
+			BlobKey key = new TransientBlobKey();
 			CheckedThread[] threads = new CheckedThread[] {
 				new TransientBlobCacheGetStorageLocation(cache, jobId, key),
 				new TransientBlobCacheGetStorageLocation(cache, jobId, key),
@@ -180,7 +185,7 @@ public class BlobCachePutTest extends TestLogger {
 
 			server.start();
 
-			BlobKey key = new BlobKey();
+			BlobKey key = new PermanentBlobKey();
 			CheckedThread[] threads = new CheckedThread[] {
 				new PermanentBlobCacheGetStorageLocation(cache, jobId, key),
 				new PermanentBlobCacheGetStorageLocation(cache, jobId, key),
@@ -213,53 +218,51 @@ public class BlobCachePutTest extends TestLogger {
 	// --------------------------------------------------------------------------------------------
 
 	@Test
-	public void testPutBufferSuccessfulGet1() throws IOException {
-		testPutBufferSuccessfulGet(null, null, false);
+	public void testPutBufferTransientSuccessfulGet1() throws IOException, InterruptedException {
+		testPutBufferSuccessfulGet(null, null, TRANSIENT_BLOB);
 	}
 
 	@Test
-	public void testPutBufferSuccessfulGet2() throws IOException {
-		testPutBufferSuccessfulGet(null, new JobID(), false);
+	public void testPutBufferTransientSuccessfulGet2() throws IOException, InterruptedException {
+		testPutBufferSuccessfulGet(null, new JobID(), TRANSIENT_BLOB);
 	}
 
 	@Test
-	public void testPutBufferSuccessfulGet3() throws IOException {
-		testPutBufferSuccessfulGet(new JobID(), new JobID(), false);
+	public void testPutBufferTransientSuccessfulGet3() throws IOException, InterruptedException {
+		testPutBufferSuccessfulGet(new JobID(), new JobID(), TRANSIENT_BLOB);
 	}
 
 	@Test
-	public void testPutBufferSuccessfulGet4() throws IOException {
-		testPutBufferSuccessfulGet(new JobID(), null, false);
+	public void testPutBufferTransientSuccessfulGet4() throws IOException, InterruptedException {
+		testPutBufferSuccessfulGet(new JobID(), null, TRANSIENT_BLOB);
 	}
 
 	@Test
-	public void testPutBufferSuccessfulGetHa() throws IOException {
-		testPutBufferSuccessfulGet(new JobID(), new JobID(), true);
+	public void testPutBufferPermanentSuccessfulGet() throws IOException, InterruptedException {
+		testPutBufferSuccessfulGet(new JobID(), new JobID(), PERMANENT_BLOB);
 	}
 
 	/**
-	 * Uploads two byte arrays for different jobs into the server via the {@link BlobCache}. File
-	 * transfers should be successful.
+	 * Uploads two byte arrays for different jobs into the server via the {@link BlobCacheService}.
+	 * File transfers should be successful.
 	 *
 	 * @param jobId1
 	 * 		first job id
 	 * @param jobId2
 	 * 		second job id
-	 * @param highAvailability
-	 * 		whether to upload a permanent blob (<tt>true</tt>, via {@link
-	 * 		BlobClient#uploadJarFiles(InetSocketAddress, Configuration, JobID, List)}) or not
-	 * 		(<tt>false</tt>, via {@link TransientBlobCache#put(JobID, byte[])}
+	 * @param blobType
+	 * 		whether the BLOB should become permanent or transient
 	 */
 	private void testPutBufferSuccessfulGet(
-			@Nullable JobID jobId1, @Nullable JobID jobId2, boolean highAvailability)
-			throws IOException {
+			@Nullable JobID jobId1, @Nullable JobID jobId2, BlobKey.BlobType blobType)
+			throws IOException, InterruptedException {
 
 		final Configuration config = new Configuration();
 		config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
 
 		try (
 			BlobServer server = new BlobServer(config, new VoidBlobStore());
-			BlobCache cache = new BlobCache(new InetSocketAddress("localhost", server.getPort()),
+			BlobCacheService cache = new BlobCacheService(new InetSocketAddress("localhost", server.getPort()),
 				config, new VoidBlobStore())) {
 
 			server.start();
@@ -269,58 +272,86 @@ public class BlobCachePutTest extends TestLogger {
 			byte[] data2 = Arrays.copyOfRange(data, 10, 54);
 
 			// put data for jobId1 and verify
-			BlobKey key1a = put(cache, jobId1, data, highAvailability);
+			BlobKey key1a = put(cache, jobId1, data, blobType);
 			assertNotNull(key1a);
+			verifyType(blobType, key1a);
 
-			BlobKey key1b = put(cache, jobId1, data2, highAvailability);
+			BlobKey key1b = put(cache, jobId1, data2, blobType);
 			assertNotNull(key1b);
+			verifyType(blobType, key1b);
 
 			// files should be available on the server
-			verifyContents(server, jobId1, key1a, data, highAvailability);
-			verifyContents(server, jobId1, key1b, data2, highAvailability);
+			verifyContents(server, jobId1, key1a, data);
+			verifyContents(server, jobId1, key1b, data2);
 
 			// now put data for jobId2 and verify that both are ok
-			BlobKey key2a = put(cache, jobId2, data, highAvailability);
+			BlobKey key2a = put(cache, jobId2, data, blobType);
 			assertNotNull(key2a);
 			assertEquals(key1a, key2a);
 
-			BlobKey key2b = put(cache, jobId2, data2, highAvailability);
+			BlobKey key2b = put(cache, jobId2, data2, blobType);
 			assertNotNull(key2b);
 			assertEquals(key1b, key2b);
 
 			// verify the accessibility and the BLOB contents
-			verifyContents(server, jobId1, key1a, data, highAvailability);
-			verifyContents(server, jobId1, key1b, data2, highAvailability);
-			verifyContents(server, jobId2, key2a, data, highAvailability);
-			verifyContents(server, jobId2, key2b, data2, highAvailability);
+			verifyContents(server, jobId1, key1a, data);
+			verifyContents(server, jobId1, key1b, data2);
+			verifyContents(server, jobId2, key2a, data);
+			verifyContents(server, jobId2, key2b, data2);
+
+			// now verify we can access the BLOBs from the cache
+			verifyContents(cache, jobId1, key1a, data);
+			verifyContents(cache, jobId1, key1b, data2);
+			verifyContents(cache, jobId2, key2a, data);
+			verifyContents(cache, jobId2, key2b, data2);
+
+			// transient BLOBs should be deleted from the server, eventually
+			if (blobType == TRANSIENT_BLOB) {
+				verifyDeletedEventually(server, jobId1, key1a);
+				verifyDeletedEventually(server, jobId1, key1b);
+				verifyDeletedEventually(server, jobId2, key2a);
+				verifyDeletedEventually(server, jobId2, key2b);
+
+				// the files are still there on the cache though
+				verifyContents(cache, jobId1, key1a, data);
+				verifyContents(cache, jobId1, key1b, data2);
+				verifyContents(cache, jobId2, key2a, data);
+				verifyContents(cache, jobId2, key2b, data2);
+			} else {
+				// still on the server for permanent BLOBs after accesses from a cache
+				verifyContents(server, jobId1, key1a, data);
+				verifyContents(server, jobId1, key1b, data2);
+				verifyContents(server, jobId2, key2a, data);
+				verifyContents(server, jobId2, key2b, data2);
+			}
 		}
 	}
 
 	// --------------------------------------------------------------------------------------------
 
 	@Test
-	public void testPutStreamSuccessfulGet1() throws IOException {
-		testPutStreamSuccessfulGet(null, null);
+	public void testPutStreamTransientSuccessfulGet1() throws IOException, InterruptedException {
+		testPutStreamTransientSuccessfulGet(null, null);
 	}
 
 	@Test
-	public void testPutStreamSuccessfulGet2() throws IOException {
-		testPutStreamSuccessfulGet(null, new JobID());
+	public void testPutStreamTransientSuccessfulGet2() throws IOException, InterruptedException {
+		testPutStreamTransientSuccessfulGet(null, new JobID());
 	}
 
 	@Test
-	public void testPutStreamSuccessfulGet3() throws IOException {
-		testPutStreamSuccessfulGet(new JobID(), new JobID());
+	public void testPutStreamTransientSuccessfulGet3() throws IOException, InterruptedException {
+		testPutStreamTransientSuccessfulGet(new JobID(), new JobID());
 	}
 
 	@Test
-	public void testPutStreamSuccessfulGet4() throws IOException {
-		testPutStreamSuccessfulGet(new JobID(), null);
+	public void testPutStreamTransientSuccessfulGet4() throws IOException, InterruptedException {
+		testPutStreamTransientSuccessfulGet(new JobID(), null);
 	}
 
 	/**
-	 * Uploads two file streams for different jobs into the server via the {@link BlobCache}. File
-	 * transfers should be successful.
+	 * Uploads two file streams for different jobs into the server via the {@link BlobCacheService}.
+	 * File transfers should be successful.
 	 *
 	 * <p>Note that high-availability uploads of streams is currently only possible at the {@link
 	 * BlobServer}.
@@ -330,15 +361,15 @@ public class BlobCachePutTest extends TestLogger {
 	 * @param jobId2
 	 * 		second job id
 	 */
-	private void testPutStreamSuccessfulGet(@Nullable JobID jobId1, @Nullable JobID jobId2)
-			throws IOException {
+	private void testPutStreamTransientSuccessfulGet(@Nullable JobID jobId1, @Nullable JobID jobId2)
+			throws IOException, InterruptedException {
 
 		final Configuration config = new Configuration();
 		config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
 
 		try (
 			BlobServer server = new BlobServer(config, new VoidBlobStore());
-			BlobCache cache = new BlobCache(new InetSocketAddress("localhost", server.getPort()),
+			BlobCacheService cache = new BlobCacheService(new InetSocketAddress("localhost", server.getPort()),
 				config, new VoidBlobStore())) {
 
 			server.start();
@@ -348,58 +379,82 @@ public class BlobCachePutTest extends TestLogger {
 			byte[] data2 = Arrays.copyOfRange(data, 10, 54);
 
 			// put data for jobId1 and verify
-			BlobKey key1a = put(cache, jobId1, new ByteArrayInputStream(data), false);
+			TransientBlobKey key1a =
+				(TransientBlobKey) put(cache, jobId1, new ByteArrayInputStream(data), TRANSIENT_BLOB);
 			assertNotNull(key1a);
 
-			BlobKey key1b = put(cache, jobId1, new ByteArrayInputStream(data2), false);
+			TransientBlobKey key1b = (TransientBlobKey) put(cache, jobId1, new ByteArrayInputStream(data2), TRANSIENT_BLOB);
 			assertNotNull(key1b);
 
 			// files should be available on the server
-			verifyContents(server, jobId1, key1a, data, false);
-			verifyContents(server, jobId1, key1b, data2, false);
+			verifyContents(server, jobId1, key1a, data);
+			verifyContents(server, jobId1, key1b, data2);
 
 			// now put data for jobId2 and verify that both are ok
-			BlobKey key2a = put(cache, jobId2, new ByteArrayInputStream(data), false);
+			TransientBlobKey key2a =
+				(TransientBlobKey) put(cache, jobId2, new ByteArrayInputStream(data), TRANSIENT_BLOB);
 			assertNotNull(key2a);
 			assertEquals(key1a, key2a);
 
-			BlobKey key2b = put(cache, jobId2, new ByteArrayInputStream(data2), false);
+			TransientBlobKey key2b = (TransientBlobKey) put(cache, jobId2, new ByteArrayInputStream(data2), TRANSIENT_BLOB);
 			assertNotNull(key2b);
 			assertEquals(key1b, key2b);
 
 			// verify the accessibility and the BLOB contents
-			verifyContents(server, jobId1, key1a, data, false);
-			verifyContents(server, jobId1, key1b, data2, false);
-			verifyContents(server, jobId2, key2a, data, false);
-			verifyContents(server, jobId2, key2b, data2, false);
+			verifyContents(server, jobId1, key1a, data);
+			verifyContents(server, jobId1, key1b, data2);
+			verifyContents(server, jobId2, key2a, data);
+			verifyContents(server, jobId2, key2b, data2);
+
+			// now verify we can access the BLOBs from the cache
+			verifyContents(cache, jobId1, key1a, data);
+			verifyContents(cache, jobId1, key1b, data2);
+			verifyContents(cache, jobId2, key2a, data);
+			verifyContents(cache, jobId2, key2b, data2);
+
+			// transient BLOBs should be deleted from the server, eventually
+			verifyDeletedEventually(server, jobId1, key1a);
+			verifyDeletedEventually(server, jobId1, key1b);
+			verifyDeletedEventually(server, jobId2, key2a);
+			verifyDeletedEventually(server, jobId2, key2b);
+
+			// the files are still there on the cache though
+			verifyContents(cache, jobId1, key1a, data);
+			verifyContents(cache, jobId1, key1b, data2);
+			verifyContents(cache, jobId2, key2a, data);
+			verifyContents(cache, jobId2, key2b, data2);
 		}
 	}
 
 	// --------------------------------------------------------------------------------------------
 
 	@Test
-	public void testPutChunkedStreamSuccessfulGet1() throws IOException {
-		testPutChunkedStreamSuccessfulGet(null, null);
+	public void testPutChunkedStreamTransientSuccessfulGet1()
+			throws IOException, InterruptedException {
+		testPutChunkedStreamTransientSuccessfulGet(null, null);
 	}
 
 	@Test
-	public void testPutChunkedStreamSuccessfulGet2() throws IOException {
-		testPutChunkedStreamSuccessfulGet(null, new JobID());
+	public void testPutChunkedStreamTransientSuccessfulGet2()
+			throws IOException, InterruptedException {
+		testPutChunkedStreamTransientSuccessfulGet(null, new JobID());
 	}
 
 	@Test
-	public void testPutChunkedStreamSuccessfulGet3() throws IOException {
-		testPutChunkedStreamSuccessfulGet(new JobID(), new JobID());
+	public void testPutChunkedStreamTransientSuccessfulGet3()
+			throws IOException, InterruptedException {
+		testPutChunkedStreamTransientSuccessfulGet(new JobID(), new JobID());
 	}
 
 	@Test
-	public void testPutChunkedStreamSuccessfulGet4() throws IOException {
-		testPutChunkedStreamSuccessfulGet(new JobID(), null);
+	public void testPutChunkedStreamTransientSuccessfulGet4()
+			throws IOException, InterruptedException {
+		testPutChunkedStreamTransientSuccessfulGet(new JobID(), null);
 	}
 
 	/**
 	 * Uploads two chunked file streams for different jobs into the server via the {@link
-	 * BlobCache}. File transfers should be successful.
+	 * BlobCacheService}. File transfers should be successful.
 	 *
 	 * <p>Note that high-availability uploads of streams is currently only possible at the {@link
 	 * BlobServer}.
@@ -409,15 +464,16 @@ public class BlobCachePutTest extends TestLogger {
 	 * @param jobId2
 	 * 		second job id
 	 */
-	private void testPutChunkedStreamSuccessfulGet(
-			@Nullable JobID jobId1, @Nullable JobID jobId2) throws IOException {
+	private void testPutChunkedStreamTransientSuccessfulGet(
+			@Nullable JobID jobId1, @Nullable JobID jobId2)
+			throws IOException, InterruptedException {
 
 		final Configuration config = new Configuration();
 		config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
 
 		try (
 			BlobServer server = new BlobServer(config, new VoidBlobStore());
-			BlobCache cache = new BlobCache(new InetSocketAddress("localhost", server.getPort()),
+			BlobCacheService cache = new BlobCacheService(new InetSocketAddress("localhost", server.getPort()),
 				config, new VoidBlobStore())) {
 
 			server.start();
@@ -427,30 +483,52 @@ public class BlobCachePutTest extends TestLogger {
 			byte[] data2 = Arrays.copyOfRange(data, 10, 54);
 
 			// put data for jobId1 and verify
-			BlobKey key1a = put(cache, jobId1, new ChunkedInputStream(data, 19), false);
+			TransientBlobKey key1a =
+				(TransientBlobKey) put(cache, jobId1, new ChunkedInputStream(data, 19), TRANSIENT_BLOB);
 			assertNotNull(key1a);
 
-			BlobKey key1b = put(cache, jobId1, new ChunkedInputStream(data2, 19), false);
+			TransientBlobKey key1b =
+				(TransientBlobKey) put(cache, jobId1, new ChunkedInputStream(data2, 19), TRANSIENT_BLOB);
 			assertNotNull(key1b);
 
 			// files should be available on the server
-			verifyContents(server, jobId1, key1a, data, false);
-			verifyContents(server, jobId1, key1b, data2, false);
+			verifyContents(server, jobId1, key1a, data);
+			verifyContents(server, jobId1, key1b, data2);
 
 			// now put data for jobId2 and verify that both are ok
-			BlobKey key2a = put(cache, jobId2, new ChunkedInputStream(data, 19), false);
+			TransientBlobKey key2a =
+				(TransientBlobKey) put(cache, jobId2, new ChunkedInputStream(data, 19), TRANSIENT_BLOB);
 			assertNotNull(key2a);
 			assertEquals(key1a, key2a);
 
-			BlobKey key2b = put(cache, jobId2, new ChunkedInputStream(data2, 19), false);
+			TransientBlobKey key2b =
+				(TransientBlobKey) put(cache, jobId2, new ChunkedInputStream(data2, 19), TRANSIENT_BLOB);
 			assertNotNull(key2b);
 			assertEquals(key1b, key2b);
 
 			// verify the accessibility and the BLOB contents
-			verifyContents(server, jobId1, key1a, data, false);
-			verifyContents(server, jobId1, key1b, data2, false);
-			verifyContents(server, jobId2, key2a, data, false);
-			verifyContents(server, jobId2, key2b, data2, false);
+			verifyContents(server, jobId1, key1a, data);
+			verifyContents(server, jobId1, key1b, data2);
+			verifyContents(server, jobId2, key2a, data);
+			verifyContents(server, jobId2, key2b, data2);
+
+			// now verify we can access the BLOBs from the cache
+			verifyContents(cache, jobId1, key1a, data);
+			verifyContents(cache, jobId1, key1b, data2);
+			verifyContents(cache, jobId2, key2a, data);
+			verifyContents(cache, jobId2, key2b, data2);
+
+			// transient BLOBs should be deleted from the server, eventually
+			verifyDeletedEventually(server, jobId1, key1a);
+			verifyDeletedEventually(server, jobId1, key1b);
+			verifyDeletedEventually(server, jobId2, key2a);
+			verifyDeletedEventually(server, jobId2, key2b);
+
+			// the files are still there on the cache though
+			verifyContents(cache, jobId1, key1a, data);
+			verifyContents(cache, jobId1, key1b, data2);
+			verifyContents(cache, jobId2, key2a, data);
+			verifyContents(cache, jobId2, key2b, data2);
 		}
 	}
 
@@ -458,31 +536,29 @@ public class BlobCachePutTest extends TestLogger {
 
 	@Test
 	public void testPutBufferFailsNoJob() throws IOException {
-		testPutBufferFails(null, false);
+		testPutBufferFails(null, TRANSIENT_BLOB);
 	}
 
 	@Test
 	public void testPutBufferFailsForJob() throws IOException {
-		testPutBufferFails(new JobID(), false);
+		testPutBufferFails(new JobID(), TRANSIENT_BLOB);
 	}
 
 	@Test
 	public void testPutBufferFailsForJobHa() throws IOException {
-		testPutBufferFails(new JobID(), true);
+		testPutBufferFails(new JobID(), PERMANENT_BLOB);
 	}
 
 	/**
-	 * Uploads a byte array to a server which cannot create any files via the {@link BlobCache}.
-	 * File transfers should fail.
+	 * Uploads a byte array to a server which cannot create any files via the {@link
+	 * BlobCacheService}. File transfers should fail.
 	 *
 	 * @param jobId
 	 * 		job id
-	 * @param highAvailability
-	 * 		whether to upload a permanent blob (<tt>true</tt>, via {@link
-	 * 		BlobClient#uploadJarFiles(InetSocketAddress, Configuration, JobID, List)}) or not
-	 * 		(<tt>false</tt>, via {@link TransientBlobCache#put(JobID, byte[])}
+	 * @param blobType
+	 * 		whether the BLOB should become permanent or transient
 	 */
-	private void testPutBufferFails(@Nullable final JobID jobId, boolean highAvailability)
+	private void testPutBufferFails(@Nullable final JobID jobId, BlobKey.BlobType blobType)
 			throws IOException {
 		assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't work on Windows.
 
@@ -492,7 +568,7 @@ public class BlobCachePutTest extends TestLogger {
 		File tempFileDir = null;
 		try (
 			BlobServer server = new BlobServer(config, new VoidBlobStore());
-			BlobCache cache = new BlobCache(new InetSocketAddress("localhost", server.getPort()),
+			BlobCacheService cache = new BlobCacheService(new InetSocketAddress("localhost", server.getPort()),
 				config, new VoidBlobStore())) {
 
 			server.start();
@@ -510,7 +586,7 @@ public class BlobCachePutTest extends TestLogger {
 			exception.expect(IOException.class);
 			exception.expectMessage("PUT operation failed: ");
 
-			put(cache, jobId, data, highAvailability);
+			put(cache, jobId, data, blobType);
 
 		} finally {
 			// set writable again to make sure we can remove the directory
@@ -523,31 +599,29 @@ public class BlobCachePutTest extends TestLogger {
 
 	@Test
 	public void testPutBufferFailsIncomingNoJob() throws IOException {
-		testPutBufferFailsIncoming(null, false);
+		testPutBufferFailsIncoming(null, TRANSIENT_BLOB);
 	}
 
 	@Test
 	public void testPutBufferFailsIncomingForJob() throws IOException {
-		testPutBufferFailsIncoming(new JobID(), false);
+		testPutBufferFailsIncoming(new JobID(), TRANSIENT_BLOB);
 	}
 
 	@Test
 	public void testPutBufferFailsIncomingForJobHa() throws IOException {
-		testPutBufferFailsIncoming(new JobID(), true);
+		testPutBufferFailsIncoming(new JobID(), PERMANENT_BLOB);
 	}
 
 	/**
 	 * Uploads a byte array to a server which cannot create incoming files via the {@link
-	 * BlobCache}. File transfers should fail.
+	 * BlobCacheService}. File transfers should fail.
 	 *
 	 * @param jobId
 	 * 		job id
-	 * @param highAvailability
-	 * 		whether to upload a permanent blob (<tt>true</tt>, via {@link
-	 * 		BlobClient#uploadJarFiles(InetSocketAddress, Configuration, JobID, List)}) or not
-	 * 		(<tt>false</tt>, via {@link TransientBlobCache#put(JobID, byte[])}
+	 * @param blobType
+	 * 		whether the BLOB should become permanent or transient
 	 */
-	private void testPutBufferFailsIncoming(@Nullable final JobID jobId, boolean highAvailability)
+	private void testPutBufferFailsIncoming(@Nullable final JobID jobId, BlobKey.BlobType blobType)
 			throws IOException {
 		assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't work on Windows.
 
@@ -557,7 +631,7 @@ public class BlobCachePutTest extends TestLogger {
 		File tempFileDir = null;
 		try (
 			BlobServer server = new BlobServer(config, new VoidBlobStore());
-			BlobCache cache = new BlobCache(new InetSocketAddress("localhost", server.getPort()),
+			BlobCacheService cache = new BlobCacheService(new InetSocketAddress("localhost", server.getPort()),
 				config, new VoidBlobStore())) {
 
 			server.start();
@@ -576,7 +650,7 @@ public class BlobCachePutTest extends TestLogger {
 			exception.expectMessage("PUT operation failed: ");
 
 			try {
-				put(cache, jobId, data, highAvailability);
+				put(cache, jobId, data, blobType);
 			} finally {
 				File storageDir = tempFileDir.getParentFile();
 				// only the incoming directory should exist (no job directory!)
@@ -593,31 +667,29 @@ public class BlobCachePutTest extends TestLogger {
 
 	@Test
 	public void testPutBufferFailsStoreNoJob() throws IOException {
-		testPutBufferFailsStore(null, false);
+		testPutBufferFailsStore(null, TRANSIENT_BLOB);
 	}
 
 	@Test
 	public void testPutBufferFailsStoreForJob() throws IOException {
-		testPutBufferFailsStore(new JobID(), false);
+		testPutBufferFailsStore(new JobID(), TRANSIENT_BLOB);
 	}
 
 	@Test
 	public void testPutBufferFailsStoreForJobHa() throws IOException {
-		testPutBufferFailsStore(new JobID(), true);
+		testPutBufferFailsStore(new JobID(), PERMANENT_BLOB);
 	}
 
 	/**
-	 * Uploads a byte array to a server which cannot create files via the {@link BlobCache}. File
-	 * transfers should fail.
+	 * Uploads a byte array to a server which cannot create files via the {@link BlobCacheService}.
+	 * File transfers should fail.
 	 *
 	 * @param jobId
 	 * 		job id
-	 * @param highAvailability
-	 * 		whether to upload a permanent blob (<tt>true</tt>, via {@link
-	 * 		BlobClient#uploadJarFiles(InetSocketAddress, Configuration, JobID, List)}) or not
-	 * 		(<tt>false</tt>, via {@link TransientBlobCache#put(JobID, byte[])}
+	 * @param blobType
+	 * 		whether the BLOB should become permanent or transient
 	 */
-	private void testPutBufferFailsStore(@Nullable final JobID jobId, boolean highAvailability)
+	private void testPutBufferFailsStore(@Nullable final JobID jobId, BlobKey.BlobType blobType)
 			throws IOException {
 		assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't work on Windows.
 
@@ -627,13 +699,14 @@ public class BlobCachePutTest extends TestLogger {
 		File jobStoreDir = null;
 		try (
 			BlobServer server = new BlobServer(config, new VoidBlobStore());
-			BlobCache cache = new BlobCache(new InetSocketAddress("localhost", server.getPort()),
+			BlobCacheService cache = new BlobCacheService(new InetSocketAddress("localhost", server.getPort()),
 				config, new VoidBlobStore())) {
 
 			server.start();
 
 			// make sure the blob server cannot create any files in its storage dir
-			jobStoreDir = server.getStorageLocation(jobId, new BlobKey()).getParentFile();
+			jobStoreDir = server.getStorageLocation(jobId,
+				BlobKey.createKey(blobType)).getParentFile();
 			assertTrue(jobStoreDir.setExecutable(true, false));
 			assertTrue(jobStoreDir.setReadable(true, false));
 			assertTrue(jobStoreDir.setWritable(false, false));
@@ -646,7 +719,7 @@ public class BlobCachePutTest extends TestLogger {
 			exception.expectMessage("PUT operation failed: ");
 
 			try {
-				put(cache, jobId, data, highAvailability);
+				put(cache, jobId, data, blobType);
 			} finally {
 				// there should be no remaining incoming files
 				File incomingFileDir = new File(jobStoreDir.getParent(), "incoming");
@@ -666,17 +739,17 @@ public class BlobCachePutTest extends TestLogger {
 
 	@Test
 	public void testConcurrentPutOperationsNoJob() throws IOException, ExecutionException, InterruptedException {
-		testConcurrentPutOperations(null, false);
+		testConcurrentPutOperations(null, TRANSIENT_BLOB);
 	}
 
 	@Test
 	public void testConcurrentPutOperationsForJob() throws IOException, ExecutionException, InterruptedException {
-		testConcurrentPutOperations(new JobID(), false);
+		testConcurrentPutOperations(new JobID(), TRANSIENT_BLOB);
 	}
 
 	@Test
 	public void testConcurrentPutOperationsForJobHa() throws IOException, ExecutionException, InterruptedException {
-		testConcurrentPutOperations(new JobID(), true);
+		testConcurrentPutOperations(new JobID(), PERMANENT_BLOB);
 	}
 
 	/**
@@ -686,11 +759,11 @@ public class BlobCachePutTest extends TestLogger {
 	 *
 	 * @param jobId
 	 * 		job ID to use (or <tt>null</tt> if job-unrelated)
-	 * @param highAvailability
-	 * 		whether to use permanent (<tt>true</tt>) or transient BLOBs (<tt>false</tt>)
+	 * @param blobType
+	 * 		whether the BLOB should become permanent or transient
 	 */
 	private void testConcurrentPutOperations(
-			@Nullable final JobID jobId, final boolean highAvailability)
+			@Nullable final JobID jobId, final BlobKey.BlobType blobType)
 			throws IOException, InterruptedException, ExecutionException {
 		final Configuration config = new Configuration();
 		config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
@@ -705,7 +778,7 @@ public class BlobCachePutTest extends TestLogger {
 		final byte[] data = new byte[dataSize];
 
 		final List<Path> jars;
-		if (highAvailability) {
+		if (blobType == PERMANENT_BLOB) {
 			// implement via JAR file upload instead:
 			File tmpFile = temporaryFolder.newFile();
 			FileUtils.writeByteArrayToFile(tmpFile, data);
@@ -720,7 +793,7 @@ public class BlobCachePutTest extends TestLogger {
 
 		try (
 			final BlobServer server = new BlobServer(config, blobStoreServer);
-			final BlobCache cache = new BlobCache(new InetSocketAddress("localhost", server.getPort()),
+			final BlobCacheService cache = new BlobCacheService(new InetSocketAddress("localhost", server.getPort()),
 				config, blobStoreCache)) {
 
 			server.start();
@@ -732,17 +805,17 @@ public class BlobCachePutTest extends TestLogger {
 
 			for (int i = 0; i < concurrentPutOperations; i++) {
 				final Supplier<BlobKey> callable;
-				if (highAvailability) {
+				if (blobType == PERMANENT_BLOB) {
 					// cannot use a blocking stream here (upload only possible via files)
 					callable =
 						() -> {
 							try {
-								List<BlobKey> keys =
+								List<PermanentBlobKey> keys =
 									BlobClient.uploadJarFiles(serverAddress, config, jobId, jars);
 								assertEquals(1, keys.size());
 								BlobKey uploadedKey = keys.get(0);
 								// check the uploaded file's contents (concurrently)
-								verifyContents(server, jobId, uploadedKey, data, true);
+								verifyContents(server, jobId, uploadedKey, data);
 								return uploadedKey;
 							} catch (IOException e) {
 								throw new CompletionException(new FlinkException(
@@ -756,9 +829,9 @@ public class BlobCachePutTest extends TestLogger {
 							try {
 								BlockingInputStream inputStream =
 									new BlockingInputStream(countDownLatch, data);
-								BlobKey uploadedKey = put(cache, jobId, inputStream, false);
+								BlobKey uploadedKey = put(cache, jobId, inputStream, blobType);
 								// check the uploaded file's contents (concurrently)
-								verifyContents(server, jobId, uploadedKey, data, false);
+								verifyContents(server, jobId, uploadedKey, data);
 								return uploadedKey;
 							} catch (IOException e) {
 								throw new CompletionException(new FlinkException(
@@ -789,10 +862,10 @@ public class BlobCachePutTest extends TestLogger {
 			}
 
 			// check the uploaded file's contents
-			verifyContents(server, jobId, blobKey, data, highAvailability);
+			verifyContents(server, jobId, blobKey, data);
 
 			// check that we only uploaded the file once to the blob store
-			if (highAvailability) {
+			if (blobType == PERMANENT_BLOB) {
 				verify(blobStoreServer, times(1)).put(any(File.class), eq(jobId), eq(blobKey));
 			} else {
 				// can't really verify much in the other cases other than that the put operations should
@@ -805,4 +878,27 @@ public class BlobCachePutTest extends TestLogger {
 			executor.shutdownNow();
 		}
 	}
+
+	/**
+	 * Checks that the given blob will be deleted at the {@link BlobServer} eventually (waits at most 30s).
+	 *
+	 * @param server
+	 * 		BLOB server
+	 * @param jobId
+	 * 		job ID or <tt>null</tt> if job-unrelated
+	 * @param key
+	 * 		key identifying the BLOB to request
+	 */
+	static void verifyDeletedEventually(BlobServer server, @Nullable JobID jobId, BlobKey key)
+			throws IOException, InterruptedException {
+
+		long deadline = System.currentTimeMillis() + 30_000L;
+		do {
+			Thread.sleep(10);
+		}
+		while (checkFilesExist(jobId, Collections.singletonList(key), server, false) != 0 &&
+			System.currentTimeMillis() < deadline);
+
+		verifyDeleted(server, jobId, key);
+	}
 }