You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/10/05 14:07:04 UTC

[11/14] flink git commit: [FLINK-7068][blob] Introduce permanent and transient BLOB keys

http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRecoveryTest.java
index 9f462c2..1a3f161 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRecoveryTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.blob;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
@@ -37,15 +36,19 @@ import java.net.InetSocketAddress;
 import java.util.Arrays;
 import java.util.Random;
 
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
 import static org.apache.flink.runtime.blob.BlobServerGetTest.verifyDeleted;
 import static org.apache.flink.runtime.blob.BlobServerPutTest.put;
 import static org.apache.flink.runtime.blob.BlobServerPutTest.verifyContents;
-import static org.junit.Assert.assertEquals;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 
 /**
- * Tests for the recovery of files of a {@link BlobCache} from a HA store.
+ * Tests for the recovery of files of a {@link BlobCacheService} from a HA store.
  */
 public class BlobCacheRecoveryTest extends TestLogger {
 
@@ -60,7 +63,6 @@ public class BlobCacheRecoveryTest extends TestLogger {
 	public void testBlobCacheRecovery() throws Exception {
 		Configuration config = new Configuration();
 		config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
-		config.setString(CoreOptions.STATE_BACKEND, "FILESYSTEM");
 		config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
 		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().getPath());
 
@@ -80,9 +82,9 @@ public class BlobCacheRecoveryTest extends TestLogger {
 	/**
 	 * Helper to test that the {@link BlobServer} recovery from its HA store works.
 	 *
-	 * <p>Uploads two BLOBs to one {@link BlobServer} via a {@link BlobCache} and expects a second
-	 * {@link BlobCache} to be able to retrieve them from a second {@link BlobServer} that is
-	 * configured with the same HA store.
+	 * <p>Uploads two BLOBs to one {@link BlobServer} via a {@link BlobCacheService} and expects a
+	 * second {@link BlobCacheService} to be able to retrieve them from a second {@link BlobServer}
+	 * that is configured with the same HA store.
 	 *
 	 * @param config
 	 * 		blob server configuration (including HA settings like {@link HighAvailabilityOptions#HA_STORAGE_PATH}
@@ -104,10 +106,10 @@ public class BlobCacheRecoveryTest extends TestLogger {
 			BlobServer server0 = new BlobServer(config, blobStore);
 			BlobServer server1 = new BlobServer(config, blobStore);
 			// use VoidBlobStore as the HA store to force download from each server's HA store
-			BlobCache cache0 = new BlobCache(
+			BlobCacheService cache0 = new BlobCacheService(
 				new InetSocketAddress("localhost", server0.getPort()), config,
 				new VoidBlobStore());
-			BlobCache cache1 = new BlobCache(
+			BlobCacheService cache1 = new BlobCacheService(
 				new InetSocketAddress("localhost", server1.getPort()), config,
 				new VoidBlobStore())) {
 
@@ -124,13 +126,15 @@ public class BlobCacheRecoveryTest extends TestLogger {
 
 			// Put job-related HA data
 			JobID[] jobId = new JobID[] { new JobID(), new JobID() };
-			keys[0] = put(cache0, jobId[0], expected, true); // Request 1
-			keys[1] = put(cache0, jobId[1], expected2, true); // Request 2
+			keys[0] = put(cache0, jobId[0], expected, PERMANENT_BLOB); // Request 1
+			keys[1] = put(cache0, jobId[1], expected2, PERMANENT_BLOB); // Request 2
 
 			// put non-HA data
-			nonHAKey = put(cache0, jobId[0], expected2, false);
+			nonHAKey = put(cache0, jobId[0], expected2, TRANSIENT_BLOB);
 			assertNotEquals(keys[0], nonHAKey);
-			assertEquals(keys[1], nonHAKey);
+			assertThat(keys[0].getHash(), not(equalTo(nonHAKey.getHash())));
+			assertNotEquals(keys[1], nonHAKey);
+			assertThat(keys[1].getHash(), equalTo(nonHAKey.getHash()));
 
 			// check that the storage directory exists
 			final Path blobServerPath = new Path(storagePath, "blob");
@@ -138,11 +142,11 @@ public class BlobCacheRecoveryTest extends TestLogger {
 			assertTrue("Unknown storage dir: " + blobServerPath, fs.exists(blobServerPath));
 
 			// Verify HA requests from cache1 (connected to server1) with no immediate access to the file
-			verifyContents(cache1, jobId[0], keys[0], expected, true);
-			verifyContents(cache1, jobId[1], keys[1], expected2, true);
+			verifyContents(cache1, jobId[0], keys[0], expected);
+			verifyContents(cache1, jobId[1], keys[1], expected2);
 
 			// Verify non-HA file is not accessible from server1
-			verifyDeleted(cache1, jobId[0], nonHAKey, true);
+			verifyDeleted(cache1, jobId[0], nonHAKey);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
index 9f8bfe3..80e9767 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
@@ -33,6 +33,9 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
+import static org.apache.flink.runtime.blob.BlobServerPutTest.put;
 import static org.apache.flink.runtime.blob.BlobServerPutTest.verifyContents;
 import static org.junit.Assert.fail;
 
@@ -54,7 +57,7 @@ public class BlobCacheRetriesTest extends TestLogger {
 		config.setString(BlobServerOptions.STORAGE_DIRECTORY,
 			temporaryFolder.newFolder().getAbsolutePath());
 
-		testBlobFetchRetries(config, new VoidBlobStore(), null, false);
+		testBlobFetchRetries(config, new VoidBlobStore(), null, TRANSIENT_BLOB);
 	}
 
 	/**
@@ -67,7 +70,7 @@ public class BlobCacheRetriesTest extends TestLogger {
 		config.setString(BlobServerOptions.STORAGE_DIRECTORY,
 			temporaryFolder.newFolder().getAbsolutePath());
 
-		testBlobFetchRetries(config, new VoidBlobStore(), new JobID(), false);
+		testBlobFetchRetries(config, new VoidBlobStore(), new JobID(), TRANSIENT_BLOB);
 	}
 
 	/**
@@ -88,7 +91,7 @@ public class BlobCacheRetriesTest extends TestLogger {
 		try {
 			blobStoreService = BlobUtils.createBlobStoreFromConfig(config);
 
-			testBlobFetchRetries(config, blobStoreService, new JobID(), true);
+			testBlobFetchRetries(config, blobStoreService, new JobID(), PERMANENT_BLOB);
 		} finally {
 			if (blobStoreService != null) {
 				blobStoreService.closeAndCleanupAllData();
@@ -103,25 +106,27 @@ public class BlobCacheRetriesTest extends TestLogger {
 	 * @param config
 	 * 		configuration to use (the BlobCache will get some additional settings
 	 * 		set compared to this one)
+	 * @param blobType
+	 * 		whether the BLOB should become permanent or transient
 	 */
 	private static void testBlobFetchRetries(
 			final Configuration config, final BlobStore blobStore, @Nullable final JobID jobId,
-			boolean highAvailability) throws IOException {
+			BlobKey.BlobType blobType) throws IOException {
 
 		final byte[] data = new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 0};
 
 		try (
 			BlobServer server = new TestingFailingBlobServer(config, blobStore, 2);
-			BlobCache cache = new BlobCache(new InetSocketAddress("localhost", server.getPort()),
+			BlobCacheService cache = new BlobCacheService(new InetSocketAddress("localhost", server.getPort()),
 				config, new VoidBlobStore())) {
 
 			server.start();
 
 			// upload some blob
-			final BlobKey key = BlobServerPutTest.put(server, jobId, data, highAvailability);
+			final BlobKey key = put(server, jobId, data, blobType);
 
 			// trigger a download - it should fail the first two times, but retry, and succeed eventually
-			verifyContents(cache, jobId, key, data, highAvailability);
+			verifyContents(cache, jobId, key, data);
 		}
 	}
 
@@ -135,7 +140,7 @@ public class BlobCacheRetriesTest extends TestLogger {
 		config.setString(BlobServerOptions.STORAGE_DIRECTORY,
 			temporaryFolder.newFolder().getAbsolutePath());
 
-		testBlobFetchWithTooManyFailures(config, new VoidBlobStore(), null, false);
+		testBlobFetchWithTooManyFailures(config, new VoidBlobStore(), null, TRANSIENT_BLOB);
 	}
 
 	/**
@@ -148,7 +153,7 @@ public class BlobCacheRetriesTest extends TestLogger {
 		config.setString(BlobServerOptions.STORAGE_DIRECTORY,
 			temporaryFolder.newFolder().getAbsolutePath());
 
-		testBlobFetchWithTooManyFailures(config, new VoidBlobStore(), new JobID(), false);
+		testBlobFetchWithTooManyFailures(config, new VoidBlobStore(), new JobID(), TRANSIENT_BLOB);
 	}
 
 	/**
@@ -169,7 +174,7 @@ public class BlobCacheRetriesTest extends TestLogger {
 		try {
 			blobStoreService = BlobUtils.createBlobStoreFromConfig(config);
 
-			testBlobFetchWithTooManyFailures(config, blobStoreService, new JobID(), true);
+			testBlobFetchWithTooManyFailures(config, blobStoreService, new JobID(), PERMANENT_BLOB);
 		} finally {
 			if (blobStoreService != null) {
 				blobStoreService.closeAndCleanupAllData();
@@ -184,26 +189,28 @@ public class BlobCacheRetriesTest extends TestLogger {
 	 * @param config
 	 * 		configuration to use (the BlobCache will get some additional settings
 	 * 		set compared to this one)
+	 * @param blobType
+	 * 		whether the BLOB should become permanent or transient
 	 */
 	private static void testBlobFetchWithTooManyFailures(
 			final Configuration config, final BlobStore blobStore, @Nullable final JobID jobId,
-			boolean highAvailability) throws IOException {
+			BlobKey.BlobType blobType) throws IOException {
 
 		final byte[] data = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 0 };
 
 		try (
 			BlobServer server = new TestingFailingBlobServer(config, blobStore, 0, 10);
-			BlobCache cache = new BlobCache(new InetSocketAddress("localhost", server.getPort()),
+			BlobCacheService cache = new BlobCacheService(new InetSocketAddress("localhost", server.getPort()),
 				config, new VoidBlobStore())) {
 
 			server.start();
 
 			// upload some blob
-			final BlobKey key = BlobServerPutTest.put(server, jobId, data, highAvailability);
+			final BlobKey key = put(server, jobId, data, blobType);
 
 			// trigger a download - it should fail eventually
 			try {
-				verifyContents(cache, jobId, key, data, highAvailability);
+				verifyContents(cache, jobId, key, data);
 				fail("This should fail");
 			}
 			catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
index 7a69804..fcdc714 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
@@ -34,11 +34,13 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.Arrays;
 
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
 import static org.apache.flink.runtime.blob.BlobServerPutTest.put;
 import static org.apache.flink.runtime.blob.BlobServerPutTest.verifyContents;
 
 /**
- * This class contains unit tests for the {@link BlobCache}.
+ * This class contains unit tests for the {@link BlobCacheService}.
  */
 public class BlobCacheSuccessTest extends TestLogger {
 
@@ -55,7 +57,7 @@ public class BlobCacheSuccessTest extends TestLogger {
 		config.setString(BlobServerOptions.STORAGE_DIRECTORY,
 			temporaryFolder.newFolder().getAbsolutePath());
 
-		uploadFileGetTest(config, null, false, false, false);
+		uploadFileGetTest(config, null, false, false, TRANSIENT_BLOB);
 	}
 
 	/**
@@ -68,7 +70,7 @@ public class BlobCacheSuccessTest extends TestLogger {
 		config.setString(BlobServerOptions.STORAGE_DIRECTORY,
 			temporaryFolder.newFolder().getAbsolutePath());
 
-		uploadFileGetTest(config, new JobID(), false, false, false);
+		uploadFileGetTest(config, new JobID(), false, false, TRANSIENT_BLOB);
 	}
 
 	/**
@@ -85,7 +87,7 @@ public class BlobCacheSuccessTest extends TestLogger {
 		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
 			temporaryFolder.newFolder().getPath());
 
-		uploadFileGetTest(config, new JobID(), true, true, true);
+		uploadFileGetTest(config, new JobID(), true, true, PERMANENT_BLOB);
 	}
 
 	/**
@@ -101,7 +103,7 @@ public class BlobCacheSuccessTest extends TestLogger {
 		config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
 		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
 			temporaryFolder.newFolder().getPath());
-		uploadFileGetTest(config, new JobID(), false, true, true);
+		uploadFileGetTest(config, new JobID(), false, true, PERMANENT_BLOB);
 	}
 
 	/**
@@ -117,12 +119,12 @@ public class BlobCacheSuccessTest extends TestLogger {
 		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
 			temporaryFolder.newFolder().getPath());
 
-		uploadFileGetTest(config, new JobID(), false, false, true);
+		uploadFileGetTest(config, new JobID(), false, false, PERMANENT_BLOB);
 	}
 
 	/**
 	 * Uploads two different BLOBs to the {@link BlobServer} via a {@link BlobClient} and verifies
-	 * we can access the files from a {@link BlobCache}.
+	 * we can access the files from a {@link BlobCacheService}.
 	 *
 	 * @param config
 	 * 		configuration to use for the server and cache (the final cache's configuration will
@@ -133,12 +135,12 @@ public class BlobCacheSuccessTest extends TestLogger {
 	 * @param cacheHasAccessToFs
 	 * 		whether the cache should have access to a shared <tt>HA_STORAGE_PATH</tt> (only useful with
 	 * 		HA mode)
-	 * @param highAvailability
-	 * 		whether to use HA BLOB upload and download methods
+	 * @param blobType
+	 * 		whether the BLOB should become permanent or transient
 	 */
 	private void uploadFileGetTest(
 			final Configuration config, @Nullable JobID jobId, boolean shutdownServerAfterUpload,
-			boolean cacheHasAccessToFs, boolean highAvailability) throws IOException {
+			boolean cacheHasAccessToFs, BlobKey.BlobType blobType) throws IOException {
 
 		final Configuration cacheConfig = new Configuration(config);
 		cacheConfig.setString(BlobServerOptions.STORAGE_DIRECTORY,
@@ -162,29 +164,29 @@ public class BlobCacheSuccessTest extends TestLogger {
 			blobStoreService = BlobUtils.createBlobStoreFromConfig(cacheConfig);
 			try (
 				BlobServer server = new BlobServer(config, blobStoreService);
-				BlobCache cache = new BlobCache(new InetSocketAddress("localhost", server.getPort()),
+				BlobCacheService cache = new BlobCacheService(new InetSocketAddress("localhost", server.getPort()),
 					cacheConfig, blobStoreService)) {
 
 				server.start();
 
 				// Upload BLOBs
-				BlobKey key1 = put(server, jobId, data, highAvailability);
-				BlobKey key2 = put(server, jobId, data2, highAvailability);
+				BlobKey key1 = put(server, jobId, data, blobType);
+				BlobKey key2 = put(server, jobId, data2, blobType);
 
 				if (shutdownServerAfterUpload) {
 					// Now, shut down the BLOB server, the BLOBs must still be accessible through the cache.
 					server.close();
 				}
 
-				verifyContents(cache, jobId, key1, data, highAvailability);
-				verifyContents(cache, jobId, key2, data2, highAvailability);
+				verifyContents(cache, jobId, key1, data);
+				verifyContents(cache, jobId, key2, data2);
 
 				if (shutdownServerAfterUpload) {
 					// Now, shut down the BLOB server, the BLOBs must still be accessible through the cache.
 					server.close();
 
-					verifyContents(cache, jobId, key1, data, highAvailability);
-					verifyContents(cache, jobId, key2, data2, highAvailability);
+					verifyContents(cache, jobId, key1, data);
+					verifyContents(cache, jobId, key2, data2);
 				}
 			}
 		} finally {

http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
index c4444c8..0e97604 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
@@ -44,6 +44,9 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Random;
 
+import static org.apache.flink.runtime.blob.BlobCachePutTest.verifyDeletedEventually;
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -105,16 +108,20 @@ public class BlobClientTest extends TestLogger {
 	}
 
 	/**
-	 * Prepares a test file for the unit tests, i.e. the methods fills the file with a particular byte patterns and
-	 * computes the file's BLOB key.
+	 * Prepares a test file for the unit tests, i.e. the methods fills the file with a particular
+	 * byte patterns and computes the file's BLOB key.
 	 *
 	 * @param file
-	 *        the file to prepare for the unit tests
+	 * 		the file to prepare for the unit tests
+	 * @param blobType
+	 * 		whether the BLOB should become permanent or transient
+	 *
 	 * @return the BLOB key of the prepared file
+	 *
 	 * @throws IOException
-	 *         thrown if an I/O error occurs while writing to the test file
+	 * 		thrown if an I/O error occurs while writing to the test file
 	 */
-	private static BlobKey prepareTestFile(File file) throws IOException {
+	private static BlobKey prepareTestFile(File file, BlobKey.BlobType blobType) throws IOException {
 
 		MessageDigest md = BlobUtils.createMessageDigest();
 
@@ -138,7 +145,7 @@ public class BlobClientTest extends TestLogger {
 			}
 		}
 
-		return new BlobKey(md.digest());
+		return BlobKey.createKey(blobType, md.digest());
 	}
 
 	/**
@@ -224,29 +231,30 @@ public class BlobClientTest extends TestLogger {
 	}
 
 	@Test
-	public void testContentAddressableBufferTransientBlob() throws IOException {
-		testContentAddressableBuffer(false);
+	public void testContentAddressableBufferTransientBlob() throws IOException, InterruptedException {
+		testContentAddressableBuffer(TRANSIENT_BLOB);
 	}
 
 	@Test
-	public void testContentAddressableBufferPermantBlob() throws IOException {
-		testContentAddressableBuffer(true);
+	public void testContentAddressableBufferPermantBlob() throws IOException, InterruptedException {
+		testContentAddressableBuffer(PERMANENT_BLOB);
 	}
 
 	/**
 	 * Tests the PUT/GET operations for content-addressable buffers.
 	 *
-	 * @param permanentBlob
-	 * 		whether the BLOB is permanent (<tt>true</tt>) or transient (<tt>false</tt>)
+	 * @param blobType
+	 * 		whether the BLOB should become permanent or transient
 	 */
-	private void testContentAddressableBuffer(boolean permanentBlob) throws IOException {
+	private void testContentAddressableBuffer(BlobKey.BlobType blobType)
+			throws IOException, InterruptedException {
 		BlobClient client = null;
 
 		try {
 			byte[] testBuffer = createTestBuffer();
 			MessageDigest md = BlobUtils.createMessageDigest();
 			md.update(testBuffer);
-			BlobKey origKey = new BlobKey(md.digest());
+			BlobKey origKey = BlobKey.createKey(blobType, md.digest());
 
 			InetSocketAddress serverAddress = new InetSocketAddress("localhost", getBlobServer().getPort());
 			client = new BlobClient(serverAddress, getBlobClientConfig());
@@ -255,24 +263,30 @@ public class BlobClientTest extends TestLogger {
 			BlobKey receivedKey;
 
 			// Store the data (job-unrelated)
-			if (!permanentBlob) {
-				receivedKey = client.putBuffer(null, testBuffer, 0, testBuffer.length, false);
+			if (blobType == TRANSIENT_BLOB) {
+				receivedKey = client.putBuffer(null, testBuffer, 0, testBuffer.length, blobType);
 				assertEquals(origKey, receivedKey);
 			}
 
 			// try again with a job-related BLOB:
-			receivedKey = client.putBuffer(jobId, testBuffer, 0, testBuffer.length, permanentBlob);
+			receivedKey = client.putBuffer(jobId, testBuffer, 0, testBuffer.length, blobType);
 			assertEquals(origKey, receivedKey);
 
 			// Retrieve the data (job-unrelated)
-			if (!permanentBlob) {
-				validateGetAndClose(client.getInternal(null, receivedKey, false), testBuffer);
+			if (blobType == TRANSIENT_BLOB) {
+				validateGetAndClose(client.getInternal(null, receivedKey), testBuffer);
+				// transient BLOBs should be deleted from the server, eventually
+				verifyDeletedEventually(getBlobServer(), null, receivedKey);
 			}
 			// job-related
-			validateGetAndClose(client.getInternal(jobId, receivedKey, permanentBlob), testBuffer);
+			validateGetAndClose(client.getInternal(jobId, receivedKey), testBuffer);
+			if (blobType == TRANSIENT_BLOB) {
+				// transient BLOBs should be deleted from the server, eventually
+				verifyDeletedEventually(getBlobServer(), jobId, receivedKey);
+			}
 
 			// Check reaction to invalid keys for job-unrelated blobs
-			try (InputStream ignored = client.getInternal(null, new BlobKey(), permanentBlob)) {
+			try (InputStream ignored = client.getInternal(null, BlobKey.createKey(blobType))) {
 				fail("Expected IOException did not occur");
 			}
 			catch (IOException fnfe) {
@@ -282,7 +296,7 @@ public class BlobClientTest extends TestLogger {
 			// Check reaction to invalid keys for job-related blobs
 			// new client needed (closed from failure above)
 			client = new BlobClient(serverAddress, getBlobClientConfig());
-			try (InputStream ignored = client.getInternal(jobId, new BlobKey(), permanentBlob)) {
+			try (InputStream ignored = client.getInternal(jobId, BlobKey.createKey(blobType))) {
 				fail("Expected IOException did not occur");
 			}
 			catch (IOException fnfe) {
@@ -307,25 +321,28 @@ public class BlobClientTest extends TestLogger {
 	}
 
 	@Test
-	public void testContentAddressableStreamTransientBlob() throws IOException {
-		testContentAddressableStream(false);
+	public void testContentAddressableStreamTransientBlob()
+			throws IOException, InterruptedException {
+		testContentAddressableStream(TRANSIENT_BLOB);
 	}
 
 	@Test
-	public void testContentAddressableStreamPermanentBlob() throws IOException {
-		testContentAddressableStream(true);
+	public void testContentAddressableStreamPermanentBlob()
+			throws IOException, InterruptedException {
+		testContentAddressableStream(PERMANENT_BLOB);
 	}
 
 	/**
 	 * Tests the PUT/GET operations for content-addressable streams.
 	 *
-	 * @param permanentBlob
-	 * 		whether the BLOB is permanent (<tt>true</tt>) or transient (<tt>false</tt>)
+	 * @param blobType
+	 * 		whether the BLOB should become permanent or transient
 	 */
-	private void testContentAddressableStream(boolean permanentBlob) throws IOException {
+	private void testContentAddressableStream(BlobKey.BlobType blobType)
+			throws IOException, InterruptedException {
 
 		File testFile = temporaryFolder.newFile();
-		BlobKey origKey = prepareTestFile(testFile);
+		BlobKey origKey = prepareTestFile(testFile, blobType);
 
 		InputStream is = null;
 
@@ -335,26 +352,32 @@ public class BlobClientTest extends TestLogger {
 			BlobKey receivedKey;
 
 			// Store the data (job-unrelated)
-			if (!permanentBlob) {
+			if (blobType == TRANSIENT_BLOB) {
 				is = new FileInputStream(testFile);
-				receivedKey = client.putInputStream(null, is, false);
+				receivedKey = client.putInputStream(null, is, blobType);
 				assertEquals(origKey, receivedKey);
 			}
 
 			// try again with a job-related BLOB:
 			is = new FileInputStream(testFile);
-			receivedKey = client.putInputStream(jobId, is, permanentBlob);
+			receivedKey = client.putInputStream(jobId, is, blobType);
 			assertEquals(origKey, receivedKey);
 
 			is.close();
 			is = null;
 
 			// Retrieve the data (job-unrelated)
-			if (!permanentBlob) {
-				validateGetAndClose(client.getInternal(null, receivedKey, false), testFile);
+			if (blobType == TRANSIENT_BLOB) {
+				validateGetAndClose(client.getInternal(null, receivedKey), testFile);
+				// transient BLOBs should be deleted from the server, eventually
+				verifyDeletedEventually(getBlobServer(), null, receivedKey);
 			}
 			// job-related
-			validateGetAndClose(client.getInternal(jobId, receivedKey, permanentBlob), testFile);
+			validateGetAndClose(client.getInternal(jobId, receivedKey), testFile);
+			if (blobType == TRANSIENT_BLOB) {
+				// transient BLOBs should be deleted from the server, eventually
+				verifyDeletedEventually(getBlobServer(), jobId, receivedKey);
+			}
 		} finally {
 			if (is != null) {
 				try {
@@ -366,17 +389,17 @@ public class BlobClientTest extends TestLogger {
 
 	@Test
 	public void testGetFailsDuringStreamingNoJobTransientBlob() throws IOException {
-		testGetFailsDuringStreaming(null, false);
+		testGetFailsDuringStreaming(null, TRANSIENT_BLOB);
 	}
 
 	@Test
 	public void testGetFailsDuringStreamingForJobTransientBlob() throws IOException {
-		testGetFailsDuringStreaming(new JobID(), false);
+		testGetFailsDuringStreaming(new JobID(), TRANSIENT_BLOB);
 	}
 
 	@Test
 	public void testGetFailsDuringStreamingForJobPermanentBlob() throws IOException {
-		testGetFailsDuringStreaming(new JobID(), true);
+		testGetFailsDuringStreaming(new JobID(), PERMANENT_BLOB);
 	}
 
 	/**
@@ -384,10 +407,10 @@ public class BlobClientTest extends TestLogger {
 	 *
 	 * @param jobId
 	 * 		job ID or <tt>null</tt> if job-unrelated
-	 * @param permanentBlob
-	 * 		whether the BLOB is permanent (<tt>true</tt>) or transient (<tt>false</tt>)
+	 * @param blobType
+	 * 		whether the BLOB should become permanent or transient
 	 */
-	private void testGetFailsDuringStreaming(@Nullable final JobID jobId, boolean permanentBlob)
+	private void testGetFailsDuringStreaming(@Nullable final JobID jobId, BlobKey.BlobType blobType)
 			throws IOException {
 
 		try (BlobClient client = new BlobClient(
@@ -398,11 +421,11 @@ public class BlobClientTest extends TestLogger {
 			rnd.nextBytes(data);
 
 			// put content addressable (like libraries)
-			BlobKey key = client.putBuffer(jobId, data, 0, data.length, permanentBlob);
+			BlobKey key = client.putBuffer(jobId, data, 0, data.length, blobType);
 			assertNotNull(key);
 
 			// issue a GET request that succeeds
-			InputStream is = client.getInternal(jobId, key, permanentBlob);
+			InputStream is = client.getInternal(jobId, key);
 
 			byte[] receiveBuffer = new byte[data.length];
 			int firstChunkLen = 50000;
@@ -440,7 +463,7 @@ public class BlobClientTest extends TestLogger {
 	static void uploadJarFile(BlobServer blobServer, Configuration blobClientConfig) throws Exception {
 		final File testFile = File.createTempFile("testfile", ".dat");
 		testFile.deleteOnExit();
-		prepareTestFile(testFile);
+		prepareTestFile(testFile, PERMANENT_BLOB);
 
 		InetSocketAddress serverAddress = new InetSocketAddress("localhost", blobServer.getPort());
 
@@ -452,13 +475,13 @@ public class BlobClientTest extends TestLogger {
 			final InetSocketAddress serverAddress, final Configuration blobClientConfig,
 			final File testFile) throws IOException {
 		JobID jobId = new JobID();
-		List<BlobKey> blobKeys = BlobClient.uploadJarFiles(serverAddress, blobClientConfig,
+		List<PermanentBlobKey> blobKeys = BlobClient.uploadJarFiles(serverAddress, blobClientConfig,
 			jobId, Collections.singletonList(new Path(testFile.toURI())));
 
 		assertEquals(1, blobKeys.size());
 
 		try (BlobClient blobClient = new BlobClient(serverAddress, blobClientConfig)) {
-			validateGetAndClose(blobClient.getInternal(jobId, blobKeys.get(0), true), testFile);
+			validateGetAndClose(blobClient.getInternal(jobId, blobKeys.get(0)), testFile);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java
index 3797f87..49f4fc2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java
@@ -25,7 +25,16 @@ import org.junit.Test;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
+import java.io.IOException;
 
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThan;
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -54,48 +63,119 @@ public final class BlobKeyTest extends TestLogger {
 		}
 	}
 
+	@Test
+	public void testCreateKey() {
+		BlobKey key = BlobKey.createKey(PERMANENT_BLOB, KEY_ARRAY_1);
+		verifyType(PERMANENT_BLOB, key);
+		assertArrayEquals(KEY_ARRAY_1, key.getHash());
+
+		key = BlobKey.createKey(TRANSIENT_BLOB, KEY_ARRAY_1);
+		verifyType(TRANSIENT_BLOB, key);
+		assertArrayEquals(KEY_ARRAY_1, key.getHash());
+
+	}
+
+	@Test
+	public void testSerializationTransient() throws Exception {
+		testSerialization(TRANSIENT_BLOB);
+	}
+
+	@Test
+	public void testSerializationPermanent() throws Exception {
+		testSerialization(PERMANENT_BLOB);
+	}
+
 	/**
 	 * Tests the serialization/deserialization of BLOB keys.
 	 */
-	@Test
-	public void testSerialization() throws Exception {
-		final BlobKey k1 = new BlobKey(KEY_ARRAY_1);
+	private void testSerialization(BlobKey.BlobType blobType) throws Exception {
+		final BlobKey k1 = BlobKey.createKey(blobType, KEY_ARRAY_1);
 		final BlobKey k2 = CommonTestUtils.createCopySerializable(k1);
 		assertEquals(k1, k2);
 		assertEquals(k1.hashCode(), k2.hashCode());
 		assertEquals(0, k1.compareTo(k2));
 	}
 
+	@Test
+	public void testEqualsTransient() {
+		testEquals(TRANSIENT_BLOB);
+	}
+
+	@Test
+	public void testEqualsPermanent() {
+		testEquals(PERMANENT_BLOB);
+	}
+
 	/**
 	 * Tests the equals method.
 	 */
-	@Test
-	public void testEquals() {
-		final BlobKey k1 = new BlobKey(KEY_ARRAY_1);
-		final BlobKey k2 = new BlobKey(KEY_ARRAY_1);
-		final BlobKey k3 = new BlobKey(KEY_ARRAY_2);
+	private void testEquals(BlobKey.BlobType blobType) {
+		final BlobKey k1 = BlobKey.createKey(blobType, KEY_ARRAY_1);
+		final BlobKey k2 = BlobKey.createKey(blobType, KEY_ARRAY_1);
+		final BlobKey k3 = BlobKey.createKey(blobType, KEY_ARRAY_2);
 		assertTrue(k1.equals(k2));
+		assertTrue(k2.equals(k1));
 		assertFalse(k1.equals(k3));
+		assertFalse(k3.equals(k1));
+	}
+
+	/**
+	 * Tests the equals method.
+	 */
+	@Test
+	public void testEqualsDifferentBlobType() {
+		final BlobKey k1 = BlobKey.createKey(TRANSIENT_BLOB, KEY_ARRAY_1);
+		final BlobKey k2 = BlobKey.createKey(PERMANENT_BLOB, KEY_ARRAY_1);
+		assertFalse(k1.equals(k2));
+		assertFalse(k2.equals(k1));
+	}
+
+	@Test
+	public void testComparesTransient() {
+		testCompares(TRANSIENT_BLOB);
+	}
+
+	@Test
+	public void testComparesPermanent() {
+		testCompares(PERMANENT_BLOB);
 	}
 
 	/**
 	 * Tests the compares method.
 	 */
+	private void testCompares(BlobKey.BlobType blobType) {
+		final BlobKey k1 = BlobKey.createKey(blobType, KEY_ARRAY_1);
+		final BlobKey k2 = BlobKey.createKey(blobType, KEY_ARRAY_1);
+		final BlobKey k3 = BlobKey.createKey(blobType, KEY_ARRAY_2);
+		assertThat(k1.compareTo(k2), is(0));
+		assertThat(k2.compareTo(k1), is(0));
+		assertThat(k1.compareTo(k3), lessThan(0));
+		assertThat(k3.compareTo(k1), greaterThan(0));
+	}
+
+	@Test
+	public void testComparesDifferentBlobType() {
+		final BlobKey k1 = BlobKey.createKey(TRANSIENT_BLOB, KEY_ARRAY_1);
+		final BlobKey k2 = BlobKey.createKey(PERMANENT_BLOB, KEY_ARRAY_1);
+		assertThat(k1.compareTo(k2), greaterThan(0));
+		assertThat(k2.compareTo(k1), lessThan(0));
+	}
+
+	@Test
+	public void testStreamsTransient() throws Exception {
+		testStreams(TRANSIENT_BLOB);
+	}
+
 	@Test
-	public void testCompares() {
-		final BlobKey k1 = new BlobKey(KEY_ARRAY_1);
-		final BlobKey k2 = new BlobKey(KEY_ARRAY_1);
-		final BlobKey k3 = new BlobKey(KEY_ARRAY_2);
-		assertTrue(k1.compareTo(k2) == 0);
-		assertTrue(k1.compareTo(k3) < 0);
+	public void testStreamsPermanent() throws Exception {
+		testStreams(PERMANENT_BLOB);
 	}
 
 	/**
 	 * Test the serialization/deserialization using input/output streams.
 	 */
-	@Test
-	public void testStreams() throws Exception {
-		final BlobKey k1 = new BlobKey(KEY_ARRAY_1);
+	private void testStreams(BlobKey.BlobType blobType) throws IOException {
+		final BlobKey k1 = BlobKey.createKey(blobType, KEY_ARRAY_1);
 		final ByteArrayOutputStream baos = new ByteArrayOutputStream(20);
 
 		k1.writeToOutputStream(baos);
@@ -106,4 +186,18 @@ public final class BlobKeyTest extends TestLogger {
 
 		assertEquals(k1, k2);
 	}
+
+	/**
+	 * Verifies that the given <tt>key</tt> is of an expected type.
+	 *
+	 * @param expected the type the key should have
+	 * @param key      the key to verify
+	 */
+	static void verifyType(BlobKey.BlobType expected, BlobKey key) {
+		if (expected == PERMANENT_BLOB) {
+			assertThat(key, is(instanceOf(PermanentBlobKey.class)));
+		} else {
+			assertThat(key, is(instanceOf(TransientBlobKey.class)));
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCorruptionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCorruptionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCorruptionTest.java
index c4c6762..56f193b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCorruptionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCorruptionTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.blob;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.util.TestLogger;
 
@@ -37,6 +36,7 @@ import java.nio.file.Files;
 import java.util.Arrays;
 import java.util.Random;
 
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
 import static org.apache.flink.runtime.blob.BlobServerGetTest.get;
 import static org.apache.flink.runtime.blob.BlobServerPutTest.put;
 import static org.junit.Assert.assertNotNull;
@@ -64,7 +64,6 @@ public class BlobServerCorruptionTest extends TestLogger {
 
 		final Configuration config = new Configuration();
 		config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
-		config.setString(CoreOptions.STATE_BACKEND, "FILESYSTEM");
 		config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
 		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().getPath());
 
@@ -108,7 +107,7 @@ public class BlobServerCorruptionTest extends TestLogger {
 			rnd.nextBytes(data);
 
 			// put content addressable (like libraries)
-			BlobKey key = put(server, jobId, data, true);
+			BlobKey key = put(server, jobId, data, PERMANENT_BLOB);
 			assertNotNull(key);
 
 			// delete local file to make sure that the GET requests downloads from HA
@@ -131,7 +130,7 @@ public class BlobServerCorruptionTest extends TestLogger {
 			expectedException.expect(IOException.class);
 			expectedException.expectMessage("data corruption");
 
-			get(server, jobId, key, true);
+			get(server, jobId, key);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
index bb977d3..a110d4a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
@@ -45,6 +45,8 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
 import static org.apache.flink.runtime.blob.BlobCacheCleanupTest.checkFileCountForJob;
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
 import static org.apache.flink.runtime.blob.BlobServerGetTest.verifyDeleted;
 import static org.apache.flink.runtime.blob.BlobServerPutTest.put;
 import static org.apache.flink.runtime.blob.BlobServerPutTest.verifyContents;
@@ -95,6 +97,7 @@ public class BlobServerDeleteTest extends TestLogger {
 	 */
 	private void testDeleteTransient(@Nullable JobID jobId1, @Nullable JobID jobId2)
 			throws IOException {
+		final boolean sameJobId = (jobId1 == jobId2) || (jobId1 != null && jobId1.equals(jobId2));
 
 		final Configuration config = new Configuration();
 		config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
@@ -109,35 +112,34 @@ public class BlobServerDeleteTest extends TestLogger {
 			data2[0] ^= 1;
 
 			// put first BLOB
-			BlobKey key1 = put(server, jobId1, data, false);
+			TransientBlobKey key1 = (TransientBlobKey) put(server, jobId1, data, TRANSIENT_BLOB);
 			assertNotNull(key1);
 
 			// put two more BLOBs (same key, other key) for another job ID
-			BlobKey key2a = put(server, jobId2, data, false);
+			TransientBlobKey key2a = (TransientBlobKey) put(server, jobId2, data, TRANSIENT_BLOB);
 			assertNotNull(key2a);
 			assertEquals(key1, key2a);
-			BlobKey key2b = put(server, jobId2, data2, false);
+			TransientBlobKey key2b = (TransientBlobKey) put(server, jobId2, data2, TRANSIENT_BLOB);
 			assertNotNull(key2b);
 
 			// issue a DELETE request
 			assertTrue(delete(server, jobId1, key1));
 
-			verifyDeleted(server, jobId1, key1, false);
+			verifyDeleted(server, jobId1, key1);
 			// deleting a one BLOB should not affect another BLOB, even with the same key if job IDs are different
-			if ((jobId1 == null && jobId2 != null) || (jobId1 != null && !jobId1.equals(jobId2))) {
-				verifyContents(server, jobId2, key2a, data, false);
+			if (!sameJobId) {
+				verifyContents(server, jobId2, key2a, data);
 			}
-			verifyContents(server, jobId2, key2b, data2, false);
+			verifyContents(server, jobId2, key2b, data2);
 
 			// delete first file of second job
 			assertTrue(delete(server, jobId2, key2a));
-			verifyDeleted(server, jobId2, key2a, false);
-			verifyContents(server, jobId2, key2b, data2, false);
+			verifyDeleted(server, jobId2, key2a);
+			verifyContents(server, jobId2, key2b, data2);
 
 			// delete second file of second job
-			assertTrue(delete(server, jobId2, key2a));
-			verifyDeleted(server, jobId2, key2a, false);
-			verifyContents(server, jobId2, key2b, data2, false);
+			assertTrue(delete(server, jobId2, key2b));
+			verifyDeleted(server, jobId2, key2b);
 		}
 	}
 
@@ -171,7 +173,7 @@ public class BlobServerDeleteTest extends TestLogger {
 			rnd.nextBytes(data);
 
 			// put BLOB
-			BlobKey key = put(server, jobId, data, false);
+			TransientBlobKey key = (TransientBlobKey) put(server, jobId, data, TRANSIENT_BLOB);
 			assertNotNull(key);
 
 			File blobFile = server.getStorageLocation(jobId, key);
@@ -179,11 +181,11 @@ public class BlobServerDeleteTest extends TestLogger {
 
 			// DELETE operation should not fail if file is already deleted
 			assertTrue(delete(server, jobId, key));
-			verifyDeleted(server, jobId, key, false);
+			verifyDeleted(server, jobId, key);
 
 			// one more delete call that should not fail
 			assertTrue(delete(server, jobId, key));
-			verifyDeleted(server, jobId, key, false);
+			verifyDeleted(server, jobId, key);
 		}
 	}
 
@@ -223,7 +225,7 @@ public class BlobServerDeleteTest extends TestLogger {
 				rnd.nextBytes(data);
 
 				// put BLOB
-				BlobKey key = put(server, jobId, data, false);
+				TransientBlobKey key = (TransientBlobKey) put(server, jobId, data, TRANSIENT_BLOB);
 				assertNotNull(key);
 
 				blobFile = server.getStorageLocation(jobId, key);
@@ -236,7 +238,7 @@ public class BlobServerDeleteTest extends TestLogger {
 				assertFalse(delete(server, jobId, key));
 
 				// the file should still be there
-				verifyContents(server, jobId, key, data, false);
+				verifyContents(server, jobId, key, data);
 			} finally {
 				if (blobFile != null && directory != null) {
 					//noinspection ResultOfMethodCallIgnored
@@ -250,21 +252,21 @@ public class BlobServerDeleteTest extends TestLogger {
 
 	@Test
 	public void testJobCleanup() throws IOException, InterruptedException {
-		testJobCleanup(false);
+		testJobCleanup(TRANSIENT_BLOB);
 	}
 
 	@Test
 	public void testJobCleanupHa() throws IOException, InterruptedException {
-		testJobCleanup(true);
+		testJobCleanup(PERMANENT_BLOB);
 	}
 
 	/**
 	 * Tests that {@link BlobServer} cleans up after calling {@link BlobServer#cleanupJob(JobID)}.
 	 *
-	 * @param highAvailability
-	 * 		whether to use permanent (<tt>true</tt>) or transient BLOBs (<tt>false</tt>)
+	 * @param blobType
+	 * 		whether the BLOB should become permanent or transient
 	 */
-	private void testJobCleanup(boolean highAvailability) throws IOException {
+	private void testJobCleanup(BlobKey.BlobType blobType) throws IOException {
 		JobID jobId1 = new JobID();
 		JobID jobId2 = new JobID();
 
@@ -280,31 +282,31 @@ public class BlobServerDeleteTest extends TestLogger {
 			byte[] data2 = Arrays.copyOf(data, data.length);
 			data2[0] ^= 1;
 
-			BlobKey key1a = put(server, jobId1, data, highAvailability);
-			BlobKey key2 = put(server, jobId2, data, highAvailability);
+			BlobKey key1a = put(server, jobId1, data, blobType);
+			BlobKey key2 = put(server, jobId2, data, blobType);
 			assertEquals(key1a, key2);
 
-			BlobKey key1b = put(server, jobId1, data2, highAvailability);
+			BlobKey key1b = put(server, jobId1, data2, blobType);
 
-			verifyContents(server, jobId1, key1a, data, highAvailability);
-			verifyContents(server, jobId1, key1b, data2, highAvailability);
+			verifyContents(server, jobId1, key1a, data);
+			verifyContents(server, jobId1, key1b, data2);
 			checkFileCountForJob(2, jobId1, server);
 
-			verifyContents(server, jobId2, key2, data, highAvailability);
+			verifyContents(server, jobId2, key2, data);
 			checkFileCountForJob(1, jobId2, server);
 
 			server.cleanupJob(jobId1);
 
-			verifyDeleted(server, jobId1, key1a, highAvailability);
-			verifyDeleted(server, jobId1, key1b, highAvailability);
+			verifyDeleted(server, jobId1, key1a);
+			verifyDeleted(server, jobId1, key1b);
 			checkFileCountForJob(0, jobId1, server);
-			verifyContents(server, jobId2, key2, data, highAvailability);
+			verifyContents(server, jobId2, key2, data);
 			checkFileCountForJob(1, jobId2, server);
 
 			server.cleanupJob(jobId2);
 
 			checkFileCountForJob(0, jobId1, server);
-			verifyDeleted(server, jobId2, key2, highAvailability);
+			verifyDeleted(server, jobId2, key2);
 			checkFileCountForJob(0, jobId2, server);
 
 			// calling a second time should not fail
@@ -313,12 +315,14 @@ public class BlobServerDeleteTest extends TestLogger {
 	}
 
 	@Test
-	public void testConcurrentDeleteOperationsNoJob() throws IOException, ExecutionException, InterruptedException {
+	public void testConcurrentDeleteOperationsNoJobTransient()
+			throws IOException, ExecutionException, InterruptedException {
 		testConcurrentDeleteOperations(null);
 	}
 
 	@Test
-	public void testConcurrentDeleteOperationsForJob() throws IOException, ExecutionException, InterruptedException {
+	public void testConcurrentDeleteOperationsForJobTransient()
+			throws IOException, ExecutionException, InterruptedException {
 		testConcurrentDeleteOperations(new JobID());
 	}
 
@@ -350,7 +354,8 @@ public class BlobServerDeleteTest extends TestLogger {
 
 			server.start();
 
-			final BlobKey blobKey = put(server, jobId, data, false);
+			final TransientBlobKey blobKey =
+				(TransientBlobKey) put(server, jobId, data, TRANSIENT_BLOB);
 
 			assertTrue(server.getStorageLocation(jobId, blobKey).exists());
 
@@ -396,11 +401,11 @@ public class BlobServerDeleteTest extends TestLogger {
 	 *
 	 * @return delete success
 	 */
-	static boolean delete(BlobService service, @Nullable JobID jobId, BlobKey key) {
+	static boolean delete(BlobService service, @Nullable JobID jobId, TransientBlobKey key) {
 		if (jobId == null) {
-			return service.getTransientBlobStore().delete(key);
+			return service.getTransientBlobService().deleteFromCache(key);
 		} else {
-			return service.getTransientBlobStore().delete(jobId, key);
+			return service.getTransientBlobService().deleteFromCache(jobId, key);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
index 0873aba..4927279 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.blob;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.util.FlinkException;
@@ -57,6 +56,8 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
 import static org.apache.flink.runtime.blob.BlobClientTest.validateGetAndClose;
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
 import static org.apache.flink.runtime.blob.BlobServerPutTest.put;
 import static org.apache.flink.runtime.blob.BlobUtils.JOB_DIR_PREFIX;
 import static org.junit.Assert.assertArrayEquals;
@@ -73,7 +74,8 @@ import static org.mockito.Mockito.mock;
  * Tests how failing GET requests behave in the presence of failures when used with a {@link
  * BlobServer}.
  *
- * <p>Successful GET requests are tested in conjunction wit the PUT requests.
+ * <p>Successful GET requests are tested in conjunction wit the PUT requests by {@link
+ * BlobServerPutTest}.
  */
 public class BlobServerGetTest extends TestLogger {
 
@@ -86,23 +88,23 @@ public class BlobServerGetTest extends TestLogger {
 	public final ExpectedException exception = ExpectedException.none();
 
 	@Test
-	public void testGetFailsDuringLookup1() throws IOException {
-		testGetFailsDuringLookup(null, new JobID(), false);
+	public void testGetTransientFailsDuringLookup1() throws IOException {
+		testGetFailsDuringLookup(null, new JobID(), TRANSIENT_BLOB);
 	}
 
 	@Test
-	public void testGetFailsDuringLookup2() throws IOException {
-		testGetFailsDuringLookup(new JobID(), new JobID(), false);
+	public void testGetTransientFailsDuringLookup2() throws IOException {
+		testGetFailsDuringLookup(new JobID(), new JobID(), TRANSIENT_BLOB);
 	}
 
 	@Test
-	public void testGetFailsDuringLookup3() throws IOException {
-		testGetFailsDuringLookup(new JobID(), null, false);
+	public void testGetTransientFailsDuringLookup3() throws IOException {
+		testGetFailsDuringLookup(new JobID(), null, TRANSIENT_BLOB);
 	}
 
 	@Test
-	public void testGetFailsDuringLookupHa() throws IOException {
-		testGetFailsDuringLookup(new JobID(), new JobID(), true);
+	public void testGetPermanentFailsDuringLookup() throws IOException {
+		testGetFailsDuringLookup(new JobID(), new JobID(), PERMANENT_BLOB);
 	}
 
 	/**
@@ -110,9 +112,11 @@ public class BlobServerGetTest extends TestLogger {
 	 *
 	 * @param jobId1 first job ID or <tt>null</tt> if job-unrelated
 	 * @param jobId2 second job ID different to <tt>jobId1</tt>
+	 * @param blobType
+	 * 		whether the BLOB should become permanent or transient
 	 */
 	private void testGetFailsDuringLookup(
-			@Nullable final JobID jobId1, @Nullable final JobID jobId2, boolean highAvailability)
+			@Nullable final JobID jobId1, @Nullable final JobID jobId2, BlobKey.BlobType blobType)
 			throws IOException {
 		final Configuration config = new Configuration();
 		config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
@@ -125,7 +129,7 @@ public class BlobServerGetTest extends TestLogger {
 			rnd.nextBytes(data);
 
 			// put content addressable (like libraries)
-			BlobKey key = put(server, jobId1, data, highAvailability);
+			BlobKey key = put(server, jobId1, data, blobType);
 			assertNotNull(key);
 
 			// delete file to make sure that GET requests fail
@@ -133,22 +137,22 @@ public class BlobServerGetTest extends TestLogger {
 			assertTrue(blobFile.delete());
 
 			// issue a GET request that fails
-			verifyDeleted(server, jobId1, key, highAvailability);
+			verifyDeleted(server, jobId1, key);
 
 			// add the same data under a second jobId
-			BlobKey key2 = put(server, jobId2, data, highAvailability);
+			BlobKey key2 = put(server, jobId2, data, blobType);
 			assertNotNull(key);
 			assertEquals(key, key2);
 
 			// request for jobId2 should succeed
-			get(server, jobId2, key, highAvailability);
+			get(server, jobId2, key);
 			// request for jobId1 should still fail
-			verifyDeleted(server, jobId1, key, highAvailability);
+			verifyDeleted(server, jobId1, key);
 
 			// same checks as for jobId1 but for jobId2 should also work:
 			blobFile = server.getStorageLocation(jobId2, key);
 			assertTrue(blobFile.delete());
-			verifyDeleted(server, jobId2, key, highAvailability);
+			verifyDeleted(server, jobId2, key);
 		}
 	}
 
@@ -164,7 +168,6 @@ public class BlobServerGetTest extends TestLogger {
 
 		final Configuration config = new Configuration();
 		config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
-		config.setString(CoreOptions.STATE_BACKEND, "FILESYSTEM");
 		config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
 		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().getPath());
 
@@ -181,7 +184,7 @@ public class BlobServerGetTest extends TestLogger {
 				// store the data on the server (and blobStore), remove from local store
 				byte[] data = new byte[2000000];
 				rnd.nextBytes(data);
-				BlobKey blobKey = put(server, jobId, data, true);
+				BlobKey blobKey = put(server, jobId, data, PERMANENT_BLOB);
 				assertTrue(server.getStorageLocation(jobId, blobKey).delete());
 
 				// make sure the blob server cannot create any files in its storage dir
@@ -195,7 +198,7 @@ public class BlobServerGetTest extends TestLogger {
 				exception.expectMessage("Permission denied");
 
 				try {
-					get(server, jobId, blobKey, true);
+					get(server, jobId, blobKey);
 				} finally {
 					HashSet<String> expectedDirs = new HashSet<>();
 					expectedDirs.add("incoming");
@@ -236,7 +239,6 @@ public class BlobServerGetTest extends TestLogger {
 
 		final Configuration config = new Configuration();
 		config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
-		config.setString(CoreOptions.STATE_BACKEND, "FILESYSTEM");
 		config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
 		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().getPath());
 
@@ -253,7 +255,7 @@ public class BlobServerGetTest extends TestLogger {
 				// store the data on the server (and blobStore), remove from local store
 				byte[] data = new byte[2000000];
 				rnd.nextBytes(data);
-				BlobKey blobKey = put(server, jobId, data, true);
+				BlobKey blobKey = put(server, jobId, data, PERMANENT_BLOB);
 				assertTrue(server.getStorageLocation(jobId, blobKey).delete());
 
 				// make sure the blob cache cannot create any files in its storage dir
@@ -266,7 +268,7 @@ public class BlobServerGetTest extends TestLogger {
 				exception.expect(AccessDeniedException.class);
 
 				try {
-					get(server, jobId, blobKey, true);
+					get(server, jobId, blobKey);
 				} finally {
 					// there should be no remaining incoming files
 					File incomingFileDir = new File(jobStoreDir.getParent(), "incoming");
@@ -307,7 +309,7 @@ public class BlobServerGetTest extends TestLogger {
 			// store the data on the server (and blobStore), remove from local store
 			byte[] data = new byte[2000000];
 			rnd.nextBytes(data);
-			BlobKey blobKey = put(server, jobId, data, true);
+			BlobKey blobKey = put(server, jobId, data, PERMANENT_BLOB);
 			assertTrue(server.getStorageLocation(jobId, blobKey).delete());
 
 			File tempFileDir = server.createTemporaryFilename().getParentFile();
@@ -316,7 +318,7 @@ public class BlobServerGetTest extends TestLogger {
 			exception.expect(NoSuchFileException.class);
 
 			try {
-				get(server, jobId, blobKey, true);
+				get(server, jobId, blobKey);
 			} finally {
 				HashSet<String> expectedDirs = new HashSet<>();
 				expectedDirs.add("incoming");
@@ -336,17 +338,17 @@ public class BlobServerGetTest extends TestLogger {
 
 	@Test
 	public void testConcurrentGetOperationsNoJob() throws IOException, ExecutionException, InterruptedException {
-		testConcurrentGetOperations(null, false);
+		testConcurrentGetOperations(null, TRANSIENT_BLOB);
 	}
 
 	@Test
 	public void testConcurrentGetOperationsForJob() throws IOException, ExecutionException, InterruptedException {
-		testConcurrentGetOperations(new JobID(), false);
+		testConcurrentGetOperations(new JobID(), TRANSIENT_BLOB);
 	}
 
 	@Test
 	public void testConcurrentGetOperationsForJobHa() throws IOException, ExecutionException, InterruptedException {
-		testConcurrentGetOperations(new JobID(), true);
+		testConcurrentGetOperations(new JobID(), PERMANENT_BLOB);
 	}
 
 	/**
@@ -355,11 +357,11 @@ public class BlobServerGetTest extends TestLogger {
 	 *
 	 * @param jobId
 	 * 		job ID to use (or <tt>null</tt> if job-unrelated)
-	 * @param highAvailability
-	 * 		whether to use permanent (<tt>true</tt>) or transient BLOBs (<tt>false</tt>)
+	 * @param blobType
+	 * 		whether the BLOB should become permanent or transient
 	 */
 	private void testConcurrentGetOperations(
-			@Nullable final JobID jobId, final boolean highAvailability)
+			@Nullable final JobID jobId, final BlobKey.BlobType blobType)
 			throws IOException, InterruptedException, ExecutionException {
 		final Configuration config = new Configuration();
 		config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
@@ -374,7 +376,7 @@ public class BlobServerGetTest extends TestLogger {
 		MessageDigest md = BlobUtils.createMessageDigest();
 
 		// create the correct blob key by hashing our input data
-		final BlobKey blobKey = new BlobKey(md.digest(data));
+		final BlobKey blobKey = BlobKey.createKey(blobType, md.digest(data));
 
 		doAnswer(
 			new Answer() {
@@ -396,10 +398,10 @@ public class BlobServerGetTest extends TestLogger {
 			server.start();
 
 			// upload data first
-			assertEquals(blobKey, put(server, jobId, data, highAvailability));
+			assertEquals(blobKey, put(server, jobId, data, blobType));
 
 			// now try accessing it concurrently (only HA mode will be able to retrieve it from HA store!)
-			if (highAvailability) {
+			if (blobType == PERMANENT_BLOB) {
 				// remove local copy so that a transfer from HA store takes place
 				assertTrue(server.getStorageLocation(jobId, blobKey).delete());
 			}
@@ -407,7 +409,7 @@ public class BlobServerGetTest extends TestLogger {
 				CompletableFuture<File> getOperation = CompletableFuture.supplyAsync(
 					() -> {
 						try {
-							File file = get(server, jobId, blobKey, highAvailability);
+							File file = get(server, jobId, blobKey);
 							// check that we have read the right data
 							validateGetAndClose(new FileInputStream(file), data);
 							return file;
@@ -431,8 +433,8 @@ public class BlobServerGetTest extends TestLogger {
 	/**
 	 * Retrieves the given blob.
 	 *
-	 * <p>Note that if a {@link BlobCache} is used, it may try to access the {@link BlobServer} to
-	 * retrieve the blob.
+	 * <p>Note that if a {@link BlobCacheService} is used, it may try to access the {@link
+	 * BlobServer} to retrieve the blob.
 	 *
 	 * @param service
 	 * 		BLOB client to use for connecting to the BLOB service
@@ -440,26 +442,22 @@ public class BlobServerGetTest extends TestLogger {
 	 * 		job ID or <tt>null</tt> if job-unrelated
 	 * @param key
 	 * 		key identifying the BLOB to request
-	 * @param highAvailability
-	 * 		whether to check HA mode accessors
 	 */
-	static File get(
-			BlobService service, @Nullable JobID jobId, BlobKey key, boolean highAvailability)
-			throws IOException {
-		if (highAvailability) {
-			return service.getPermanentBlobStore().getHAFile(jobId, key);
+	static File get(BlobService service, @Nullable JobID jobId, BlobKey key) throws IOException {
+		if (key instanceof PermanentBlobKey) {
+			return service.getPermanentBlobService().getFile(jobId, (PermanentBlobKey) key);
 		} else if (jobId == null) {
-			return service.getTransientBlobStore().getFile(key);
+			return service.getTransientBlobService().getFile((TransientBlobKey) key);
 		} else {
-			return service.getTransientBlobStore().getFile(jobId, key);
+			return service.getTransientBlobService().getFile(jobId, (TransientBlobKey) key);
 		}
 	}
 
 	/**
 	 * Checks that the given blob does not exist anymore by trying to access it.
 	 *
-	 * <p>Note that if a {@link BlobCache} is used, it may try to access the {@link BlobServer} to
-	 * retrieve the blob.
+	 * <p>Note that if a {@link BlobCacheService} is used, it may try to access the {@link
+	 * BlobServer} to retrieve the blob.
 	 *
 	 * @param service
 	 * 		BLOB client to use for connecting to the BLOB service
@@ -467,14 +465,11 @@ public class BlobServerGetTest extends TestLogger {
 	 * 		job ID or <tt>null</tt> if job-unrelated
 	 * @param key
 	 * 		key identifying the BLOB to request
-	 * @param highAvailability
-	 * 		whether to check HA mode accessors
 	 */
-	static void verifyDeleted(
-			BlobService service, @Nullable JobID jobId, BlobKey key, boolean highAvailability)
+	static void verifyDeleted(BlobService service, @Nullable JobID jobId, BlobKey key)
 			throws IOException {
 		try {
-			get(service, jobId, key, highAvailability);
+			get(service, jobId, key);
 			fail("File " + jobId + "/" + key + " should have been deleted.");
 		} catch (IOException e) {
 			// expected

http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
index 6b38f43..aefd0a3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
@@ -60,6 +60,8 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
 import static org.apache.flink.runtime.blob.BlobClientTest.validateGetAndClose;
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
 import static org.apache.flink.runtime.blob.BlobServerGetTest.get;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
@@ -134,11 +136,15 @@ public class BlobServerPutTest extends TestLogger {
 
 			server.start();
 
-			BlobKey key = new BlobKey();
+			BlobKey key1 = new TransientBlobKey();
+			BlobKey key2 = new PermanentBlobKey();
 			CheckedThread[] threads = new CheckedThread[] {
-				new ContentAddressableGetStorageLocation(server, jobId, key),
-				new ContentAddressableGetStorageLocation(server, jobId, key),
-				new ContentAddressableGetStorageLocation(server, jobId, key)
+				new ContentAddressableGetStorageLocation(server, jobId, key1),
+				new ContentAddressableGetStorageLocation(server, jobId, key1),
+				new ContentAddressableGetStorageLocation(server, jobId, key1),
+				new ContentAddressableGetStorageLocation(server, jobId, key2),
+				new ContentAddressableGetStorageLocation(server, jobId, key2),
+				new ContentAddressableGetStorageLocation(server, jobId, key2)
 			};
 			checkedThreadSimpleTest(threads);
 		}
@@ -168,27 +174,27 @@ public class BlobServerPutTest extends TestLogger {
 
 	@Test
 	public void testPutBufferSuccessfulGet1() throws IOException {
-		testPutBufferSuccessfulGet(null, null, false);
+		testPutBufferSuccessfulGet(null, null, TRANSIENT_BLOB);
 	}
 
 	@Test
 	public void testPutBufferSuccessfulGet2() throws IOException {
-		testPutBufferSuccessfulGet(null, new JobID(), false);
+		testPutBufferSuccessfulGet(null, new JobID(), TRANSIENT_BLOB);
 	}
 
 	@Test
 	public void testPutBufferSuccessfulGet3() throws IOException {
-		testPutBufferSuccessfulGet(new JobID(), new JobID(), false);
+		testPutBufferSuccessfulGet(new JobID(), new JobID(), TRANSIENT_BLOB);
 	}
 
 	@Test
 	public void testPutBufferSuccessfulGet4() throws IOException {
-		testPutBufferSuccessfulGet(new JobID(), null, false);
+		testPutBufferSuccessfulGet(new JobID(), null, TRANSIENT_BLOB);
 	}
 
 	@Test
 	public void testPutBufferSuccessfulGetHa() throws IOException {
-		testPutBufferSuccessfulGet(new JobID(), new JobID(), true);
+		testPutBufferSuccessfulGet(new JobID(), new JobID(), PERMANENT_BLOB);
 	}
 
 	/**
@@ -199,11 +205,11 @@ public class BlobServerPutTest extends TestLogger {
 	 * 		first job id
 	 * @param jobId2
 	 * 		second job id
-	 * @param highAvailability
-	 * 		whether to upload a permanent blob (<tt>true</tt>) or not
+	 * @param blobType
+	 * 		whether the BLOB should become permanent or transient
 	 */
 	private void testPutBufferSuccessfulGet(
-			@Nullable JobID jobId1, @Nullable JobID jobId2, boolean highAvailability)
+			@Nullable JobID jobId1, @Nullable JobID jobId2, BlobKey.BlobType blobType)
 			throws IOException {
 
 		final Configuration config = new Configuration();
@@ -218,29 +224,34 @@ public class BlobServerPutTest extends TestLogger {
 			byte[] data2 = Arrays.copyOfRange(data, 10, 54);
 
 			// put data for jobId1 and verify
-			BlobKey key1a = put(server, jobId1, data, highAvailability);
+			BlobKey key1a = put(server, jobId1, data, blobType);
 			assertNotNull(key1a);
 
-			BlobKey key1b = put(server, jobId1, data2, highAvailability);
+			BlobKey key1b = put(server, jobId1, data2, blobType);
 			assertNotNull(key1b);
 
-			verifyContents(server, jobId1, key1a, data, highAvailability);
-			verifyContents(server, jobId1, key1b, data2, highAvailability);
+			verifyContents(server, jobId1, key1a, data);
+			verifyContents(server, jobId1, key1b, data2);
 
 			// now put data for jobId2 and verify that both are ok
-			BlobKey key2a = put(server, jobId2, data, highAvailability);
+			BlobKey key2a = put(server, jobId2, data, blobType);
 			assertNotNull(key2a);
 			assertEquals(key1a, key2a);
 
-			BlobKey key2b = put(server, jobId2, data2, highAvailability);
+			BlobKey key2b = put(server, jobId2, data2, blobType);
 			assertNotNull(key2b);
 			assertEquals(key1b, key2b);
 
 			// verify the accessibility and the BLOB contents
-			verifyContents(server, jobId1, key1a, data, highAvailability);
-			verifyContents(server, jobId1, key1b, data2, highAvailability);
-			verifyContents(server, jobId2, key2a, data, highAvailability);
-			verifyContents(server, jobId2, key2b, data2, highAvailability);
+			verifyContents(server, jobId2, key2a, data);
+			verifyContents(server, jobId2, key2b, data2);
+
+			// verify the accessibility and the BLOB contents one more time (transient BLOBs should
+			// not be deleted here)
+			verifyContents(server, jobId1, key1a, data);
+			verifyContents(server, jobId1, key1b, data2);
+			verifyContents(server, jobId2, key2a, data);
+			verifyContents(server, jobId2, key2b, data2);
 		}
 	}
 
@@ -248,27 +259,27 @@ public class BlobServerPutTest extends TestLogger {
 
 	@Test
 	public void testPutStreamSuccessfulGet1() throws IOException {
-		testPutStreamSuccessfulGet(null, null, false);
+		testPutStreamSuccessfulGet(null, null, TRANSIENT_BLOB);
 	}
 
 	@Test
 	public void testPutStreamSuccessfulGet2() throws IOException {
-		testPutStreamSuccessfulGet(null, new JobID(), false);
+		testPutStreamSuccessfulGet(null, new JobID(), TRANSIENT_BLOB);
 	}
 
 	@Test
 	public void testPutStreamSuccessfulGet3() throws IOException {
-		testPutStreamSuccessfulGet(new JobID(), new JobID(), false);
+		testPutStreamSuccessfulGet(new JobID(), new JobID(), TRANSIENT_BLOB);
 	}
 
 	@Test
 	public void testPutStreamSuccessfulGet4() throws IOException {
-		testPutStreamSuccessfulGet(new JobID(), null, false);
+		testPutStreamSuccessfulGet(new JobID(), null, TRANSIENT_BLOB);
 	}
 
 	@Test
 	public void testPutStreamSuccessfulGetHa() throws IOException {
-		testPutStreamSuccessfulGet(new JobID(), new JobID(), true);
+		testPutStreamSuccessfulGet(new JobID(), new JobID(), PERMANENT_BLOB);
 	}
 
 	/**
@@ -279,11 +290,11 @@ public class BlobServerPutTest extends TestLogger {
 	 * 		first job id
 	 * @param jobId2
 	 * 		second job id
-	 * @param highAvailability
-	 * 		whether to upload a permanent blob (<tt>true</tt>) or not
+	 * @param blobType
+	 * 		whether the BLOB should become permanent or transient
 	 */
 	private void testPutStreamSuccessfulGet(
-			@Nullable JobID jobId1, @Nullable JobID jobId2, boolean highAvailability)
+			@Nullable JobID jobId1, @Nullable JobID jobId2, BlobKey.BlobType blobType)
 			throws IOException {
 
 		final Configuration config = new Configuration();
@@ -298,29 +309,34 @@ public class BlobServerPutTest extends TestLogger {
 			byte[] data2 = Arrays.copyOfRange(data, 10, 54);
 
 			// put data for jobId1 and verify
-			BlobKey key1a = put(server, jobId1, new ByteArrayInputStream(data), highAvailability);
+			BlobKey key1a = put(server, jobId1, new ByteArrayInputStream(data), blobType);
 			assertNotNull(key1a);
 
-			BlobKey key1b = put(server, jobId1, new ByteArrayInputStream(data2), highAvailability);
+			BlobKey key1b = put(server, jobId1, new ByteArrayInputStream(data2), blobType);
 			assertNotNull(key1b);
 
-			verifyContents(server, jobId1, key1a, data, highAvailability);
-			verifyContents(server, jobId1, key1b, data2, highAvailability);
+			verifyContents(server, jobId1, key1a, data);
+			verifyContents(server, jobId1, key1b, data2);
 
 			// now put data for jobId2 and verify that both are ok
-			BlobKey key2a = put(server, jobId2, new ByteArrayInputStream(data), highAvailability);
+			BlobKey key2a = put(server, jobId2, new ByteArrayInputStream(data), blobType);
 			assertNotNull(key2a);
 			assertEquals(key1a, key2a);
 
-			BlobKey key2b = put(server, jobId2, new ByteArrayInputStream(data2), highAvailability);
+			BlobKey key2b = put(server, jobId2, new ByteArrayInputStream(data2), blobType);
 			assertNotNull(key2b);
 			assertEquals(key1b, key2b);
 
 			// verify the accessibility and the BLOB contents
-			verifyContents(server, jobId1, key1a, data, highAvailability);
-			verifyContents(server, jobId1, key1b, data2, highAvailability);
-			verifyContents(server, jobId2, key2a, data, highAvailability);
-			verifyContents(server, jobId2, key2b, data2, highAvailability);
+			verifyContents(server, jobId2, key2a, data);
+			verifyContents(server, jobId2, key2b, data2);
+
+			// verify the accessibility and the BLOB contents one more time (transient BLOBs should
+			// not be deleted here)
+			verifyContents(server, jobId1, key1a, data);
+			verifyContents(server, jobId1, key1b, data2);
+			verifyContents(server, jobId2, key2a, data);
+			verifyContents(server, jobId2, key2b, data2);
 		}
 	}
 
@@ -328,27 +344,27 @@ public class BlobServerPutTest extends TestLogger {
 
 	@Test
 	public void testPutChunkedStreamSuccessfulGet1() throws IOException {
-		testPutChunkedStreamSuccessfulGet(null, null, false);
+		testPutChunkedStreamSuccessfulGet(null, null, TRANSIENT_BLOB);
 	}
 
 	@Test
 	public void testPutChunkedStreamSuccessfulGet2() throws IOException {
-		testPutChunkedStreamSuccessfulGet(null, new JobID(), false);
+		testPutChunkedStreamSuccessfulGet(null, new JobID(), TRANSIENT_BLOB);
 	}
 
 	@Test
 	public void testPutChunkedStreamSuccessfulGet3() throws IOException {
-		testPutChunkedStreamSuccessfulGet(new JobID(), new JobID(), false);
+		testPutChunkedStreamSuccessfulGet(new JobID(), new JobID(), TRANSIENT_BLOB);
 	}
 
 	@Test
 	public void testPutChunkedStreamSuccessfulGet4() throws IOException {
-		testPutChunkedStreamSuccessfulGet(new JobID(), null, false);
+		testPutChunkedStreamSuccessfulGet(new JobID(), null, TRANSIENT_BLOB);
 	}
 
 	@Test
 	public void testPutChunkedStreamSuccessfulGetHa() throws IOException {
-		testPutChunkedStreamSuccessfulGet(new JobID(), new JobID(), true);
+		testPutChunkedStreamSuccessfulGet(new JobID(), new JobID(), PERMANENT_BLOB);
 	}
 
 	/**
@@ -359,11 +375,11 @@ public class BlobServerPutTest extends TestLogger {
 	 * 		first job id
 	 * @param jobId2
 	 * 		second job id
-	 * @param highAvailability
-	 * 		whether to upload a permanent blob (<tt>true</tt>) or not
+	 * @param blobType
+	 * 		whether the BLOB should become permanent or transient
 	 */
 	private void testPutChunkedStreamSuccessfulGet(
-			@Nullable JobID jobId1, @Nullable JobID jobId2, boolean highAvailability)
+			@Nullable JobID jobId1, @Nullable JobID jobId2, BlobKey.BlobType blobType)
 			throws IOException {
 
 		final Configuration config = new Configuration();
@@ -378,29 +394,34 @@ public class BlobServerPutTest extends TestLogger {
 			byte[] data2 = Arrays.copyOfRange(data, 10, 54);
 
 			// put data for jobId1 and verify
-			BlobKey key1a = put(server, jobId1, new ChunkedInputStream(data, 19), highAvailability);
+			BlobKey key1a = put(server, jobId1, new ChunkedInputStream(data, 19), blobType);
 			assertNotNull(key1a);
 
-			BlobKey key1b = put(server, jobId1, new ChunkedInputStream(data2, 19), highAvailability);
+			BlobKey key1b = put(server, jobId1, new ChunkedInputStream(data2, 19), blobType);
 			assertNotNull(key1b);
 
-			verifyContents(server, jobId1, key1a, data, highAvailability);
-			verifyContents(server, jobId1, key1b, data2, highAvailability);
+			verifyContents(server, jobId1, key1a, data);
+			verifyContents(server, jobId1, key1b, data2);
 
 			// now put data for jobId2 and verify that both are ok
-			BlobKey key2a = put(server, jobId2, new ChunkedInputStream(data, 19), highAvailability);
+			BlobKey key2a = put(server, jobId2, new ChunkedInputStream(data, 19), blobType);
 			assertNotNull(key2a);
 			assertEquals(key1a, key2a);
 
-			BlobKey key2b = put(server, jobId2, new ChunkedInputStream(data2, 19), highAvailability);
+			BlobKey key2b = put(server, jobId2, new ChunkedInputStream(data2, 19), blobType);
 			assertNotNull(key2b);
 			assertEquals(key1b, key2b);
 
 			// verify the accessibility and the BLOB contents
-			verifyContents(server, jobId1, key1a, data, highAvailability);
-			verifyContents(server, jobId1, key1b, data2, highAvailability);
-			verifyContents(server, jobId2, key2a, data, highAvailability);
-			verifyContents(server, jobId2, key2b, data2, highAvailability);
+			verifyContents(server, jobId2, key2a, data);
+			verifyContents(server, jobId2, key2b, data2);
+
+			// verify the accessibility and the BLOB contents one more time (transient BLOBs should
+			// not be deleted here)
+			verifyContents(server, jobId1, key1a, data);
+			verifyContents(server, jobId1, key1b, data2);
+			verifyContents(server, jobId2, key2a, data);
+			verifyContents(server, jobId2, key2b, data2);
 		}
 	}
 
@@ -408,17 +429,17 @@ public class BlobServerPutTest extends TestLogger {
 
 	@Test
 	public void testPutBufferFailsNoJob() throws IOException {
-		testPutBufferFails(null, false);
+		testPutBufferFails(null, TRANSIENT_BLOB);
 	}
 
 	@Test
 	public void testPutBufferFailsForJob() throws IOException {
-		testPutBufferFails(new JobID(), false);
+		testPutBufferFails(new JobID(), TRANSIENT_BLOB);
 	}
 
 	@Test
 	public void testPutBufferFailsForJobHa() throws IOException {
-		testPutBufferFails(new JobID(), true);
+		testPutBufferFails(new JobID(), PERMANENT_BLOB);
 	}
 
 	/**
@@ -427,10 +448,10 @@ public class BlobServerPutTest extends TestLogger {
 	 *
 	 * @param jobId
 	 * 		job id
-	 * @param highAvailability
-	 * 		whether to upload a permanent blob (<tt>true</tt>) or not
+	 * @param blobType
+	 * 		whether the BLOB should become permanent or transient
 	 */
-	private void testPutBufferFails(@Nullable final JobID jobId, boolean highAvailability)
+	private void testPutBufferFails(@Nullable final JobID jobId, BlobKey.BlobType blobType)
 			throws IOException {
 		assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't work on Windows.
 
@@ -455,7 +476,7 @@ public class BlobServerPutTest extends TestLogger {
 			exception.expect(IOException.class);
 			exception.expectMessage("Cannot create directory ");
 
-			put(server, jobId, data, highAvailability);
+			put(server, jobId, data, blobType);
 
 		} finally {
 			// set writable again to make sure we can remove the directory
@@ -468,17 +489,17 @@ public class BlobServerPutTest extends TestLogger {
 
 	@Test
 	public void testPutBufferFailsIncomingNoJob() throws IOException {
-		testPutBufferFailsIncoming(null, false);
+		testPutBufferFailsIncoming(null, TRANSIENT_BLOB);
 	}
 
 	@Test
 	public void testPutBufferFailsIncomingForJob() throws IOException {
-		testPutBufferFailsIncoming(new JobID(), false);
+		testPutBufferFailsIncoming(new JobID(), TRANSIENT_BLOB);
 	}
 
 	@Test
 	public void testPutBufferFailsIncomingForJobHa() throws IOException {
-		testPutBufferFailsIncoming(new JobID(), true);
+		testPutBufferFailsIncoming(new JobID(), PERMANENT_BLOB);
 	}
 
 	/**
@@ -487,10 +508,10 @@ public class BlobServerPutTest extends TestLogger {
 	 *
 	 * @param jobId
 	 * 		job id
-	 * @param highAvailability
-	 * 		whether to upload a permanent blob (<tt>true</tt>) or not
+	 * @param blobType
+	 * 		whether the BLOB should become permanent or transient
 	 */
-	private void testPutBufferFailsIncoming(@Nullable final JobID jobId, boolean highAvailability)
+	private void testPutBufferFailsIncoming(@Nullable final JobID jobId, BlobKey.BlobType blobType)
 			throws IOException {
 		assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't work on Windows.
 
@@ -516,7 +537,7 @@ public class BlobServerPutTest extends TestLogger {
 			exception.expectMessage(" (Permission denied)");
 
 			try {
-				put(server, jobId, data, highAvailability);
+				put(server, jobId, data, blobType);
 			} finally {
 				File storageDir = tempFileDir.getParentFile();
 				// only the incoming directory should exist (no job directory!)
@@ -533,17 +554,17 @@ public class BlobServerPutTest extends TestLogger {
 
 	@Test
 	public void testPutBufferFailsStoreNoJob() throws IOException {
-		testPutBufferFailsStore(null, false);
+		testPutBufferFailsStore(null, TRANSIENT_BLOB);
 	}
 
 	@Test
 	public void testPutBufferFailsStoreForJob() throws IOException {
-		testPutBufferFailsStore(new JobID(), false);
+		testPutBufferFailsStore(new JobID(), TRANSIENT_BLOB);
 	}
 
 	@Test
 	public void testPutBufferFailsStoreForJobHa() throws IOException {
-		testPutBufferFailsStore(new JobID(), true);
+		testPutBufferFailsStore(new JobID(), PERMANENT_BLOB);
 	}
 
 	/**
@@ -552,10 +573,10 @@ public class BlobServerPutTest extends TestLogger {
 	 *
 	 * @param jobId
 	 * 		job id
-	 * @param highAvailability
-	 * 		whether to upload a permanent blob (<tt>true</tt>) or not
+	 * @param blobType
+	 * 		whether the BLOB should become permanent or transient
 	 */
-	private void testPutBufferFailsStore(@Nullable final JobID jobId, boolean highAvailability)
+	private void testPutBufferFailsStore(@Nullable final JobID jobId, BlobKey.BlobType blobType)
 			throws IOException {
 		assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't work on Windows.
 
@@ -568,7 +589,8 @@ public class BlobServerPutTest extends TestLogger {
 			server.start();
 
 			// make sure the blob server cannot create any files in its storage dir
-			jobStoreDir = server.getStorageLocation(jobId, new BlobKey()).getParentFile();
+			jobStoreDir = server.getStorageLocation(jobId,
+				BlobKey.createKey(blobType)).getParentFile();
 			assertTrue(jobStoreDir.setExecutable(true, false));
 			assertTrue(jobStoreDir.setReadable(true, false));
 			assertTrue(jobStoreDir.setWritable(false, false));
@@ -580,7 +602,7 @@ public class BlobServerPutTest extends TestLogger {
 			exception.expect(AccessDeniedException.class);
 
 			try {
-				put(server, jobId, data, highAvailability);
+				put(server, jobId, data, blobType);
 			} finally {
 				// there should be no remaining incoming files
 				File incomingFileDir = new File(jobStoreDir.getParent(), "incoming");
@@ -600,17 +622,17 @@ public class BlobServerPutTest extends TestLogger {
 
 	@Test
 	public void testConcurrentPutOperationsNoJob() throws IOException, ExecutionException, InterruptedException {
-		testConcurrentPutOperations(null, false);
+		testConcurrentPutOperations(null, TRANSIENT_BLOB);
 	}
 
 	@Test
 	public void testConcurrentPutOperationsForJob() throws IOException, ExecutionException, InterruptedException {
-		testConcurrentPutOperations(new JobID(), false);
+		testConcurrentPutOperations(new JobID(), TRANSIENT_BLOB);
 	}
 
 	@Test
 	public void testConcurrentPutOperationsForJobHa() throws IOException, ExecutionException, InterruptedException {
-		testConcurrentPutOperations(new JobID(), true);
+		testConcurrentPutOperations(new JobID(), PERMANENT_BLOB);
 	}
 
 	/**
@@ -620,11 +642,11 @@ public class BlobServerPutTest extends TestLogger {
 	 *
 	 * @param jobId
 	 * 		job ID to use (or <tt>null</tt> if job-unrelated)
-	 * @param highAvailability
-	 * 		whether to use permanent (<tt>true</tt>) or transient BLOBs (<tt>false</tt>)
+	 * @param blobType
+	 * 		whether the BLOB should become permanent or transient
 	 */
 	private void testConcurrentPutOperations(
-			@Nullable final JobID jobId, final boolean highAvailability)
+			@Nullable final JobID jobId, final BlobKey.BlobType blobType)
 			throws IOException, InterruptedException, ExecutionException {
 		final Configuration config = new Configuration();
 		config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
@@ -651,9 +673,9 @@ public class BlobServerPutTest extends TestLogger {
 							try {
 								BlockingInputStream inputStream =
 									new BlockingInputStream(countDownLatch, data);
-								BlobKey uploadedKey = put(server, jobId, inputStream, highAvailability);
+								BlobKey uploadedKey = put(server, jobId, inputStream, blobType);
 								// check the uploaded file's contents (concurrently)
-								verifyContents(server, jobId, uploadedKey, data, highAvailability);
+								verifyContents(server, jobId, uploadedKey, data);
 								return uploadedKey;
 							} catch (IOException e) {
 								throw new CompletionException(new FlinkException(
@@ -682,10 +704,10 @@ public class BlobServerPutTest extends TestLogger {
 			}
 
 			// check the uploaded file's contents
-			verifyContents(server, jobId, blobKey, data, highAvailability);
+			verifyContents(server, jobId, blobKey, data);
 
 			// check that we only uploaded the file once to the blob store
-			if (highAvailability) {
+			if (blobType == PERMANENT_BLOB) {
 				verify(blobStore, times(1)).put(any(File.class), eq(jobId), eq(blobKey));
 			} else {
 				// can't really verify much in the other cases other than that the put operations should
@@ -700,35 +722,41 @@ public class BlobServerPutTest extends TestLogger {
 	// --------------------------------------------------------------------------------------------
 
 	/**
-	 * Helper to choose the right {@link BlobServer#put} method.
+	 * Helper to choose the right {@link BlobServer#putTransient} method.
+	 *
+	 * @param blobType
+	 * 		whether the BLOB should become permanent or transient
 	 *
 	 * @return blob key for the uploaded data
 	 */
-	static BlobKey put(BlobService service, @Nullable JobID jobId, InputStream data, boolean highAvailability)
+	static BlobKey put(BlobService service, @Nullable JobID jobId, InputStream data, BlobKey.BlobType blobType)
 			throws IOException {
-		if (highAvailability) {
+		if (blobType == PERMANENT_BLOB) {
 			if (service instanceof BlobServer) {
-				return ((BlobServer) service).putHA(jobId, data);
+				return ((BlobServer) service).putPermanent(jobId, data);
 			} else {
 				throw new UnsupportedOperationException("uploading streams is only possible at the BlobServer");
 			}
 		} else if (jobId == null) {
-			return service.getTransientBlobStore().put(data);
+			return service.getTransientBlobService().putTransient(data);
 		} else {
-			return service.getTransientBlobStore().put(jobId, data);
+			return service.getTransientBlobService().putTransient(jobId, data);
 		}
 	}
 
 	/**
-	 * Helper to choose the right {@link BlobServer#put} method.
+	 * Helper to choose the right {@link BlobServer#putTransient} method.
+	 *
+	 * @param blobType
+	 * 		whether the BLOB should become permanent or transient
 	 *
 	 * @return blob key for the uploaded data
 	 */
-	static BlobKey put(BlobService service, @Nullable JobID jobId, byte[] data, boolean highAvailability)
+	static BlobKey put(BlobService service, @Nullable JobID jobId, byte[] data, BlobKey.BlobType blobType)
 			throws IOException {
-		if (highAvailability) {
+		if (blobType == PERMANENT_BLOB) {
 			if (service instanceof BlobServer) {
-				return ((BlobServer) service).putHA(jobId, data);
+				return ((BlobServer) service).putPermanent(jobId, data);
 			} else {
 				// implement via JAR file upload instead:
 				File tmpFile = Files.createTempFile("blob", ".jar").toFile();
@@ -738,7 +766,7 @@ public class BlobServerPutTest extends TestLogger {
 					// uploading HA BLOBs works on BlobServer only (and, for now, via the BlobClient)
 					Configuration clientConfig = new Configuration();
 					List<Path> jars = Collections.singletonList(new Path(tmpFile.getAbsolutePath()));
-					List<BlobKey> keys = BlobClient.uploadJarFiles(serverAddress, clientConfig, jobId, jars);
+					List<PermanentBlobKey> keys = BlobClient.uploadJarFiles(serverAddress, clientConfig, jobId, jars);
 					assertEquals(1, keys.size());
 					return keys.get(0);
 				} finally {
@@ -747,9 +775,9 @@ public class BlobServerPutTest extends TestLogger {
 				}
 			}
 		} else if (jobId == null) {
-			return service.getTransientBlobStore().put(data);
+			return service.getTransientBlobService().putTransient(data);
 		} else {
-			return service.getTransientBlobStore().put(jobId, data);
+			return service.getTransientBlobService().putTransient(jobId, data);
 		}
 	}
 
@@ -764,14 +792,12 @@ public class BlobServerPutTest extends TestLogger {
 	 * 		blob key
 	 * @param data
 	 * 		expected data
-	 * @param highAvailability
-	 * 		whether to use HA mode accessors
 	 */
 	static void verifyContents(
-			BlobService blobService, @Nullable JobID jobId, BlobKey key, byte[] data, boolean highAvailability)
+			BlobService blobService, @Nullable JobID jobId, BlobKey key, byte[] data)
 			throws IOException {
 
-		File file = get(blobService, jobId, key, highAvailability);
+		File file = get(blobService, jobId, key);
 		validateGetAndClose(new FileInputStream(file), data);
 	}
 
@@ -786,14 +812,12 @@ public class BlobServerPutTest extends TestLogger {
 	 * 		blob key
 	 * @param data
 	 * 		expected data
-	 * @param highAvailability
-	 * 		whether to use HA mode accessors
 	 */
 	static void verifyContents(
-			BlobService blobService, @Nullable JobID jobId, BlobKey key, InputStream data,
-			boolean highAvailability) throws IOException {
+			BlobService blobService, @Nullable JobID jobId, BlobKey key, InputStream data)
+			throws IOException {
 
-		File file = get(blobService, jobId, key, highAvailability);
+		File file = get(blobService, jobId, key);
 		validateGetAndClose(new FileInputStream(file), data);
 	}