You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/10/05 14:06:58 UTC

[05/14] flink git commit: [FLINK-7068][blob] change BlobService sub-classes for permanent and transient BLOBs

http://git-wip-us.apache.org/repos/asf/flink/blob/071e27f7/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
index 790514c..664dc28 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.blob;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.SecurityOptions;
+
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
@@ -62,6 +63,7 @@ public class BlobClientSslTest extends BlobClientTest {
 		config.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password");
 		config.setString(SecurityOptions.SSL_KEY_PASSWORD, "password");
 		BLOB_SSL_SERVER = new BlobServer(config, new VoidBlobStore());
+		BLOB_SSL_SERVER.start();
 
 		sslClientConfig = new Configuration();
 		sslClientConfig.setBoolean(SecurityOptions.SSL_ENABLED, true);
@@ -80,6 +82,7 @@ public class BlobClientSslTest extends BlobClientTest {
 		config.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password");
 		config.setString(SecurityOptions.SSL_KEY_PASSWORD, "password");
 		BLOB_NON_SSL_SERVER = new BlobServer(config, new VoidBlobStore());
+		BLOB_NON_SSL_SERVER.start();
 
 		nonSslClientConfig = new Configuration();
 		nonSslClientConfig.setBoolean(SecurityOptions.SSL_ENABLED, true);
@@ -110,7 +113,7 @@ public class BlobClientSslTest extends BlobClientTest {
 	}
 
 	/**
-	 * Verify ssl client to ssl server upload
+	 * Verify ssl client to ssl server upload.
 	 */
 	@Test
 	public void testUploadJarFilesHelper() throws Exception {
@@ -118,7 +121,7 @@ public class BlobClientSslTest extends BlobClientTest {
 	}
 
 	/**
-	 * Verify ssl client to non-ssl server failure
+	 * Verify ssl client to non-ssl server failure.
 	 */
 	@Test(expected = IOException.class)
 	public void testSSLClientFailure() throws Exception {
@@ -127,7 +130,7 @@ public class BlobClientSslTest extends BlobClientTest {
 	}
 
 	/**
-	 * Verify ssl client to non-ssl server failure
+	 * Verify ssl client to non-ssl server failure.
 	 */
 	@Test(expected = IOException.class)
 	public void testSSLClientFailure2() throws Exception {
@@ -136,7 +139,7 @@ public class BlobClientSslTest extends BlobClientTest {
 	}
 
 	/**
-	 * Verify non-ssl client to ssl server failure
+	 * Verify non-ssl client to ssl server failure.
 	 */
 	@Test(expected = IOException.class)
 	public void testSSLServerFailure() throws Exception {
@@ -145,7 +148,7 @@ public class BlobClientSslTest extends BlobClientTest {
 	}
 
 	/**
-	 * Verify non-ssl client to ssl server failure
+	 * Verify non-ssl client to ssl server failure.
 	 */
 	@Test(expected = IOException.class)
 	public void testSSLServerFailure2() throws Exception {
@@ -154,7 +157,7 @@ public class BlobClientSslTest extends BlobClientTest {
 	}
 
 	/**
-	 * Verify non-ssl connection sanity
+	 * Verify non-ssl connection sanity.
 	 */
 	@Test
 	public void testNonSSLConnection() throws Exception {
@@ -162,7 +165,7 @@ public class BlobClientSslTest extends BlobClientTest {
 	}
 
 	/**
-	 * Verify non-ssl connection sanity
+	 * Verify non-ssl connection sanity.
 	 */
 	@Test
 	public void testNonSSLConnection2() throws Exception {
@@ -170,7 +173,7 @@ public class BlobClientSslTest extends BlobClientTest {
 	}
 
 	/**
-	 * Verify non-ssl connection sanity
+	 * Verify non-ssl connection sanity.
 	 */
 	@Test
 	public void testNonSSLConnection3() throws Exception {
@@ -178,7 +181,7 @@ public class BlobClientSslTest extends BlobClientTest {
 	}
 
 	/**
-	 * Verify non-ssl connection sanity
+	 * Verify non-ssl connection sanity.
 	 */
 	@Test
 	public void testNonSSLConnection4() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/071e27f7/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
index 6d6bfd5..c4444c8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
@@ -18,15 +18,20 @@
 
 package org.apache.flink.runtime.blob;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.TestLogger;
+
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
+import javax.annotation.Nullable;
+
 import java.io.EOFException;
 import java.io.File;
 import java.io.FileInputStream;
@@ -37,12 +42,11 @@ import java.net.InetSocketAddress;
 import java.security.MessageDigest;
 import java.util.Collections;
 import java.util.List;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.util.TestLogger;
+import java.util.Random;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
 
 /**
@@ -56,7 +60,7 @@ public class BlobClientTest extends TestLogger {
 	/** The instance of the (non-ssl) BLOB server used during the tests. */
 	static BlobServer BLOB_SERVER;
 
-	/** The blob service (non-ssl) client configuration */
+	/** The blob service (non-ssl) client configuration. */
 	static Configuration clientConfig;
 
 	@ClassRule
@@ -72,6 +76,7 @@ public class BlobClientTest extends TestLogger {
 			temporaryFolder.newFolder().getAbsolutePath());
 
 		BLOB_SERVER = new BlobServer(config, new VoidBlobStore());
+		BLOB_SERVER.start();
 
 		clientConfig = new Configuration();
 	}
@@ -88,7 +93,7 @@ public class BlobClientTest extends TestLogger {
 
 	/**
 	 * Creates a test buffer and fills it with a specific byte pattern.
-	 * 
+	 *
 	 * @return a test buffer filled with a specific byte pattern
 	 */
 	private static byte[] createTestBuffer() {
@@ -102,7 +107,7 @@ public class BlobClientTest extends TestLogger {
 	/**
 	 * Prepares a test file for the unit tests, i.e. the methods fills the file with a particular byte patterns and
 	 * computes the file's BLOB key.
-	 * 
+	 *
 	 * @param file
 	 *        the file to prepare for the unit tests
 	 * @return the BLOB key of the prepared file
@@ -139,62 +144,56 @@ public class BlobClientTest extends TestLogger {
 	/**
 	 * Validates the result of a GET operation by comparing the data from the retrieved input stream to the content of
 	 * the specified buffer.
-	 * 
-	 * @param inputStream
+	 *
+	 * @param actualInputStream
 	 *        the input stream returned from the GET operation (will be closed by this method)
-	 * @param buf
+	 * @param expectedBuf
 	 *        the buffer to compare the input stream's data to
 	 * @throws IOException
 	 *         thrown if an I/O error occurs while reading the input stream
 	 */
-	static void validateGetAndClose(final InputStream inputStream, final byte[] buf) throws IOException {
+	static void validateGetAndClose(final InputStream actualInputStream, final byte[] expectedBuf) throws IOException {
 		try {
-			byte[] receivedBuffer = new byte[buf.length];
+			byte[] receivedBuffer = new byte[expectedBuf.length];
 
 			int bytesReceived = 0;
 
 			while (true) {
 
-				final int read = inputStream
-					.read(receivedBuffer, bytesReceived, receivedBuffer.length - bytesReceived);
+				final int read = actualInputStream.read(receivedBuffer, bytesReceived, receivedBuffer.length - bytesReceived);
 				if (read < 0) {
 					throw new EOFException();
 				}
 				bytesReceived += read;
 
 				if (bytesReceived == receivedBuffer.length) {
-					assertEquals(-1, inputStream.read());
-					assertArrayEquals(buf, receivedBuffer);
+					assertEquals(-1, actualInputStream.read());
+					assertArrayEquals(expectedBuf, receivedBuffer);
 					return;
 				}
 			}
 		} finally {
-			inputStream.close();
+			actualInputStream.close();
 		}
 	}
 
 	/**
 	 * Validates the result of a GET operation by comparing the data from the retrieved input stream to the content of
-	 * the specified file.
-	 * 
-	 * @param inputStream
+	 * the expected input stream.
+	 *
+	 * @param actualInputStream
 	 *        the input stream returned from the GET operation (will be closed by this method)
-	 * @param file
-	 *        the file to compare the input stream's data to
+	 * @param expectedInputStream
+	 *        the input stream to compare the input stream's data to
 	 * @throws IOException
-	 *         thrown if an I/O error occurs while reading the input stream or the file
+	 *         thrown if an I/O error occurs while reading any input stream
 	 */
-	private static void validateGetAndClose(final InputStream inputStream, final File file) throws IOException {
-
-		InputStream inputStream2 = null;
+	static void validateGetAndClose(InputStream actualInputStream, InputStream expectedInputStream)
+			throws IOException {
 		try {
-
-			inputStream2 = new FileInputStream(file);
-
 			while (true) {
-
-				final int r1 = inputStream.read();
-				final int r2 = inputStream2.read();
+				final int r1 = actualInputStream.read();
+				final int r2 = expectedInputStream.read();
 
 				assertEquals(r2, r1);
 
@@ -202,22 +201,45 @@ public class BlobClientTest extends TestLogger {
 					break;
 				}
 			}
-
 		} finally {
-			if (inputStream2 != null) {
-				inputStream2.close();
-			}
-			inputStream.close();
+			actualInputStream.close();
+			expectedInputStream.close();
 		}
-
 	}
 
 	/**
-	 * Tests the PUT/GET operations for content-addressable buffers.
+	 * Validates the result of a GET operation by comparing the data from the retrieved input stream to the content of
+	 * the specified file.
+	 *
+	 * @param actualInputStream
+	 *        the input stream returned from the GET operation
+	 * @param expectedFile
+	 *        the file to compare the input stream's data to
+	 * @throws IOException
+	 *         thrown if an I/O error occurs while reading the input stream or the file
 	 */
+	@SuppressWarnings("WeakerAccess")
+	static void validateGetAndClose(final InputStream actualInputStream, final File expectedFile) throws IOException {
+		validateGetAndClose(actualInputStream, new FileInputStream(expectedFile));
+	}
+
 	@Test
-	public void testContentAddressableBuffer() throws IOException {
+	public void testContentAddressableBufferTransientBlob() throws IOException {
+		testContentAddressableBuffer(false);
+	}
 
+	@Test
+	public void testContentAddressableBufferPermantBlob() throws IOException {
+		testContentAddressableBuffer(true);
+	}
+
+	/**
+	 * Tests the PUT/GET operations for content-addressable buffers.
+	 *
+	 * @param permanentBlob
+	 * 		whether the BLOB is permanent (<tt>true</tt>) or transient (<tt>false</tt>)
+	 */
+	private void testContentAddressableBuffer(boolean permanentBlob) throws IOException {
 		BlobClient client = null;
 
 		try {
@@ -230,28 +252,37 @@ public class BlobClientTest extends TestLogger {
 			client = new BlobClient(serverAddress, getBlobClientConfig());
 
 			JobID jobId = new JobID();
+			BlobKey receivedKey;
+
+			// Store the data (job-unrelated)
+			if (!permanentBlob) {
+				receivedKey = client.putBuffer(null, testBuffer, 0, testBuffer.length, false);
+				assertEquals(origKey, receivedKey);
+			}
 
-			// Store the data
-			BlobKey receivedKey = client.put(null, testBuffer);
-			assertEquals(origKey, receivedKey);
 			// try again with a job-related BLOB:
-			receivedKey = client.put(jobId, testBuffer);
+			receivedKey = client.putBuffer(jobId, testBuffer, 0, testBuffer.length, permanentBlob);
 			assertEquals(origKey, receivedKey);
 
-			// Retrieve the data
-			validateGetAndClose(client.get(receivedKey), testBuffer);
-			validateGetAndClose(client.get(jobId, receivedKey), testBuffer);
+			// Retrieve the data (job-unrelated)
+			if (!permanentBlob) {
+				validateGetAndClose(client.getInternal(null, receivedKey, false), testBuffer);
+			}
+			// job-related
+			validateGetAndClose(client.getInternal(jobId, receivedKey, permanentBlob), testBuffer);
 
-			// Check reaction to invalid keys
-			try (InputStream ignored = client.get(new BlobKey())) {
+			// Check reaction to invalid keys for job-unrelated blobs
+			try (InputStream ignored = client.getInternal(null, new BlobKey(), permanentBlob)) {
 				fail("Expected IOException did not occur");
 			}
 			catch (IOException fnfe) {
 				// expected
 			}
+
+			// Check reaction to invalid keys for job-related blobs
 			// new client needed (closed from failure above)
 			client = new BlobClient(serverAddress, getBlobClientConfig());
-			try (InputStream ignored = client.get(jobId, new BlobKey())) {
+			try (InputStream ignored = client.getInternal(jobId, new BlobKey(), permanentBlob)) {
 				fail("Expected IOException did not occur");
 			}
 			catch (IOException fnfe) {
@@ -275,52 +306,122 @@ public class BlobClientTest extends TestLogger {
 		return BLOB_SERVER;
 	}
 
+	@Test
+	public void testContentAddressableStreamTransientBlob() throws IOException {
+		testContentAddressableStream(false);
+	}
+
+	@Test
+	public void testContentAddressableStreamPermanentBlob() throws IOException {
+		testContentAddressableStream(true);
+	}
+
 	/**
 	 * Tests the PUT/GET operations for content-addressable streams.
+	 *
+	 * @param permanentBlob
+	 * 		whether the BLOB is permanent (<tt>true</tt>) or transient (<tt>false</tt>)
 	 */
-	@Test
-	public void testContentAddressableStream() throws IOException {
+	private void testContentAddressableStream(boolean permanentBlob) throws IOException {
 
-		BlobClient client = null;
-		InputStream is = null;
+		File testFile = temporaryFolder.newFile();
+		BlobKey origKey = prepareTestFile(testFile);
 
-		try {
-			File testFile = File.createTempFile("testfile", ".dat");
-			testFile.deleteOnExit();
+		InputStream is = null;
 
-			BlobKey origKey = prepareTestFile(testFile);
-
-			InetSocketAddress serverAddress = new InetSocketAddress("localhost", getBlobServer().getPort());
-			client = new BlobClient(serverAddress, getBlobClientConfig());
+		try (BlobClient client = new BlobClient(new InetSocketAddress("localhost", getBlobServer().getPort()), getBlobClientConfig())) {
 
 			JobID jobId = new JobID();
+			BlobKey receivedKey;
+
+			// Store the data (job-unrelated)
+			if (!permanentBlob) {
+				is = new FileInputStream(testFile);
+				receivedKey = client.putInputStream(null, is, false);
+				assertEquals(origKey, receivedKey);
+			}
 
-			// Store the data
-			is = new FileInputStream(testFile);
-			BlobKey receivedKey = client.put(is);
-			assertEquals(origKey, receivedKey);
 			// try again with a job-related BLOB:
 			is = new FileInputStream(testFile);
-			receivedKey = client.put(jobId, is);
+			receivedKey = client.putInputStream(jobId, is, permanentBlob);
 			assertEquals(origKey, receivedKey);
 
 			is.close();
 			is = null;
 
-			// Retrieve the data
-			validateGetAndClose(client.get(receivedKey), testFile);
-			validateGetAndClose(client.get(jobId, receivedKey), testFile);
-		}
-		finally {
+			// Retrieve the data (job-unrelated)
+			if (!permanentBlob) {
+				validateGetAndClose(client.getInternal(null, receivedKey, false), testFile);
+			}
+			// job-related
+			validateGetAndClose(client.getInternal(jobId, receivedKey, permanentBlob), testFile);
+		} finally {
 			if (is != null) {
 				try {
 					is.close();
 				} catch (Throwable ignored) {}
 			}
-			if (client != null) {
-				try {
-					client.close();
-				} catch (Throwable ignored) {}
+		}
+	}
+
+	@Test
+	public void testGetFailsDuringStreamingNoJobTransientBlob() throws IOException {
+		testGetFailsDuringStreaming(null, false);
+	}
+
+	@Test
+	public void testGetFailsDuringStreamingForJobTransientBlob() throws IOException {
+		testGetFailsDuringStreaming(new JobID(), false);
+	}
+
+	@Test
+	public void testGetFailsDuringStreamingForJobPermanentBlob() throws IOException {
+		testGetFailsDuringStreaming(new JobID(), true);
+	}
+
+	/**
+	 * Checks the correct result if a GET operation fails during the file download.
+	 *
+	 * @param jobId
+	 * 		job ID or <tt>null</tt> if job-unrelated
+	 * @param permanentBlob
+	 * 		whether the BLOB is permanent (<tt>true</tt>) or transient (<tt>false</tt>)
+	 */
+	private void testGetFailsDuringStreaming(@Nullable final JobID jobId, boolean permanentBlob)
+			throws IOException {
+
+		try (BlobClient client = new BlobClient(
+			new InetSocketAddress("localhost", getBlobServer().getPort()), getBlobClientConfig())) {
+
+			byte[] data = new byte[5000000];
+			Random rnd = new Random();
+			rnd.nextBytes(data);
+
+			// put content addressable (like libraries)
+			BlobKey key = client.putBuffer(jobId, data, 0, data.length, permanentBlob);
+			assertNotNull(key);
+
+			// issue a GET request that succeeds
+			InputStream is = client.getInternal(jobId, key, permanentBlob);
+
+			byte[] receiveBuffer = new byte[data.length];
+			int firstChunkLen = 50000;
+			BlobUtils.readFully(is, receiveBuffer, 0, firstChunkLen, null);
+			BlobUtils.readFully(is, receiveBuffer, firstChunkLen, firstChunkLen, null);
+
+			// shut down the server
+			for (BlobServerConnection conn : getBlobServer().getCurrentActiveConnections()) {
+				conn.close();
+			}
+
+			try {
+				BlobUtils.readFully(is, receiveBuffer, 2 * firstChunkLen, data.length - 2 * firstChunkLen, null);
+				// we tolerate that this succeeds, as the receiver socket may have buffered
+				// everything already, but in this case, also verify the contents
+				assertArrayEquals(data, receiveBuffer);
+			}
+			catch (IOException e) {
+				// expected
 			}
 		}
 	}
@@ -357,7 +458,7 @@ public class BlobClientTest extends TestLogger {
 		assertEquals(1, blobKeys.size());
 
 		try (BlobClient blobClient = new BlobClient(serverAddress, blobClientConfig)) {
-			validateGetAndClose(blobClient.get(jobId, blobKeys.get(0)), testFile);
+			validateGetAndClose(blobClient.getInternal(jobId, blobKeys.get(0), true), testFile);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/071e27f7/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java
index 43bc622..3797f87 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java
@@ -18,21 +18,18 @@
 
 package org.apache.flink.runtime.blob;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-
 import org.apache.flink.core.testutils.CommonTestUtils;
-import org.apache.flink.util.StringUtils;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
 /**
  * This class contains unit tests for the {@link BlobKey} class.
  */
@@ -46,7 +43,8 @@ public final class BlobKeyTest extends TestLogger {
 	 * The second key array to be used during the unit tests.
 	 */
 	private static final byte[] KEY_ARRAY_2 = new byte[20];
-	/**
+
+	/*
 	 * Initialize the key array.
 	 */
 	static {
@@ -57,7 +55,7 @@ public final class BlobKeyTest extends TestLogger {
 	}
 
 	/**
-	 * Tests the serialization/deserialization of BLOB keys
+	 * Tests the serialization/deserialization of BLOB keys.
 	 */
 	@Test
 	public void testSerialization() throws Exception {
@@ -99,10 +97,10 @@ public final class BlobKeyTest extends TestLogger {
 	public void testStreams() throws Exception {
 		final BlobKey k1 = new BlobKey(KEY_ARRAY_1);
 		final ByteArrayOutputStream baos = new ByteArrayOutputStream(20);
-		
+
 		k1.writeToOutputStream(baos);
 		baos.close();
-		
+
 		final ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
 		final BlobKey k2 = BlobKey.readFromInputStream(bais);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/071e27f7/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
deleted file mode 100644
index 81304f4..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.blob;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.BlobServerOptions;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
-import org.apache.flink.util.TestLogger;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Random;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-public class BlobRecoveryITCase extends TestLogger {
-
-	@Rule
-	public TemporaryFolder temporaryFolder = new TemporaryFolder();
-
-	/**
-	 * Tests that with {@link HighAvailabilityMode#ZOOKEEPER} distributed JARs are recoverable from any
-	 * participating BlobServer.
-	 */
-	@Test
-	public void testBlobServerRecovery() throws Exception {
-		Configuration config = new Configuration();
-		config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
-		config.setString(CoreOptions.STATE_BACKEND, "FILESYSTEM");
-		config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
-		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().getPath());
-
-		BlobStoreService blobStoreService = null;
-
-		try {
-			blobStoreService = BlobUtils.createBlobStoreFromConfig(config);
-
-			testBlobServerRecovery(config, blobStoreService);
-		} finally {
-			if (blobStoreService != null) {
-				blobStoreService.closeAndCleanupAllData();
-			}
-		}
-	}
-
-	public static void testBlobServerRecovery(final Configuration config, final BlobStore blobStore) throws IOException {
-		final String clusterId = config.getString(HighAvailabilityOptions.HA_CLUSTER_ID);
-		String storagePath = config.getString(HighAvailabilityOptions.HA_STORAGE_PATH) + "/" + clusterId;
-		Random rand = new Random();
-
-		BlobServer[] server = new BlobServer[2];
-		InetSocketAddress[] serverAddress = new InetSocketAddress[2];
-		BlobClient client = null;
-
-		try {
-			for (int i = 0; i < server.length; i++) {
-				server[i] = new BlobServer(config, blobStore);
-				serverAddress[i] = new InetSocketAddress("localhost", server[i].getPort());
-			}
-
-			client = new BlobClient(serverAddress[0], config);
-
-			// Random data
-			byte[] expected = new byte[1024];
-			rand.nextBytes(expected);
-
-			BlobKey[] keys = new BlobKey[2];
-
-			// Put job-unrelated data
-			keys[0] = client.put(null, expected); // Request 1
-			keys[1] = client.put(null, expected, 32, 256); // Request 2
-
-			// Put job-related data, verify that the checksums match
-			JobID[] jobId = new JobID[] { new JobID(), new JobID() };
-			BlobKey key;
-			key = client.put(jobId[0], expected); // Request 3
-			assertEquals(keys[0], key);
-			key = client.put(jobId[1], expected, 32, 256); // Request 4
-			assertEquals(keys[1], key);
-
-			// check that the storage directory exists
-			final Path blobServerPath = new Path(storagePath, "blob");
-			FileSystem fs = blobServerPath.getFileSystem();
-			assertTrue("Unknown storage dir: " + blobServerPath, fs.exists(blobServerPath));
-
-			// Close the client and connect to the other server
-			client.close();
-			client = new BlobClient(serverAddress[1], config);
-
-			// Verify request 1
-			try (InputStream is = client.get(keys[0])) {
-				byte[] actual = new byte[expected.length];
-
-				BlobUtils.readFully(is, actual, 0, expected.length, null);
-
-				for (int i = 0; i < expected.length; i++) {
-					assertEquals(expected[i], actual[i]);
-				}
-			}
-
-			// Verify request 2
-			try (InputStream is = client.get(keys[1])) {
-				byte[] actual = new byte[256];
-				BlobUtils.readFully(is, actual, 0, 256, null);
-
-				for (int i = 32, j = 0; i < 256; i++, j++) {
-					assertEquals(expected[i], actual[j]);
-				}
-			}
-
-			// Verify request 3
-			try (InputStream is = client.get(jobId[0], keys[0])) {
-				byte[] actual = new byte[expected.length];
-				BlobUtils.readFully(is, actual, 0, expected.length, null);
-
-				for (int i = 0; i < expected.length; i++) {
-					assertEquals(expected[i], actual[i]);
-				}
-			}
-
-			// Verify request 4
-			try (InputStream is = client.get(jobId[1], keys[1])) {
-				byte[] actual = new byte[256];
-				BlobUtils.readFully(is, actual, 0, 256, null);
-
-				for (int i = 32, j = 0; i < 256; i++, j++) {
-					assertEquals(expected[i], actual[j]);
-				}
-			}
-
-			// Remove again
-			client.delete(keys[0]);
-			client.delete(keys[1]);
-			client.delete(jobId[0], keys[0]);
-			client.delete(jobId[1], keys[1]);
-
-			// Verify everything is clean
-			assertTrue("HA storage directory does not exist", fs.exists(new Path(storagePath)));
-			if (fs.exists(blobServerPath)) {
-				final org.apache.flink.core.fs.FileStatus[] recoveryFiles =
-					fs.listStatus(blobServerPath);
-				ArrayList<String> filenames = new ArrayList<String>(recoveryFiles.length);
-				for (org.apache.flink.core.fs.FileStatus file: recoveryFiles) {
-					filenames.add(file.toString());
-				}
-				fail("Unclean state backend: " + filenames);
-			}
-		}
-		finally {
-			for (BlobServer s : server) {
-				if (s != null) {
-					s.close();
-				}
-			}
-
-			if (client != null) {
-				client.close();
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/071e27f7/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCorruptionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCorruptionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCorruptionTest.java
new file mode 100644
index 0000000..c4c6762
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCorruptionTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.Random;
+
+import static org.apache.flink.runtime.blob.BlobServerGetTest.get;
+import static org.apache.flink.runtime.blob.BlobServerPutTest.put;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests how GET requests react to corrupt files when downloaded via a {@link BlobServer}.
+ *
+ * <p>Successful GET requests are tested in conjunction wit the PUT requests.
+ */
+public class BlobServerCorruptionTest extends TestLogger {
+
+	@Rule
+	public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+	@Rule
+	public final ExpectedException exception = ExpectedException.none();
+
+	/**
+	 * Checks the GET operation fails when the downloaded file (from {@link BlobServer} or HA store)
+	 * is corrupt, i.e. its content's hash does not match the {@link BlobKey}'s hash.
+	 */
+	@Test
+	public void testGetFailsFromCorruptFile() throws IOException {
+
+		final Configuration config = new Configuration();
+		config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
+		config.setString(CoreOptions.STATE_BACKEND, "FILESYSTEM");
+		config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
+		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().getPath());
+
+		BlobStoreService blobStoreService = null;
+
+		try {
+			blobStoreService = BlobUtils.createBlobStoreFromConfig(config);
+
+			testGetFailsFromCorruptFile(config, blobStoreService, exception);
+		} finally {
+			if (blobStoreService != null) {
+				blobStoreService.closeAndCleanupAllData();
+			}
+		}
+	}
+
+	/**
+	 * Checks the GET operation fails when the downloaded file (from HA store)
+	 * is corrupt, i.e. its content's hash does not match the {@link BlobKey}'s hash.
+	 *
+	 * @param config
+	 * 		blob server configuration (including HA settings like {@link HighAvailabilityOptions#HA_STORAGE_PATH}
+	 * 		and {@link HighAvailabilityOptions#HA_CLUSTER_ID}) used to set up <tt>blobStore</tt>
+	 * @param blobStore
+	 * 		shared HA blob store to use
+	 * @param expectedException
+	 * 		expected exception rule to use
+	 */
+	public static void testGetFailsFromCorruptFile(
+			Configuration config, BlobStore blobStore, ExpectedException expectedException)
+			throws IOException {
+
+		Random rnd = new Random();
+		JobID jobId = new JobID();
+
+		try (BlobServer server = new BlobServer(config, blobStore)) {
+
+			server.start();
+
+			byte[] data = new byte[2000000];
+			rnd.nextBytes(data);
+
+			// put content addressable (like libraries)
+			BlobKey key = put(server, jobId, data, true);
+			assertNotNull(key);
+
+			// delete local file to make sure that the GET requests downloads from HA
+			File blobFile = server.getStorageLocation(jobId, key);
+			assertTrue(blobFile.delete());
+
+			// change HA store file contents to make sure that GET requests fail
+			byte[] data2 = Arrays.copyOf(data, data.length);
+			data2[0] ^= 1;
+			File tmpFile = Files.createTempFile("blob", ".jar").toFile();
+			try {
+				FileUtils.writeByteArrayToFile(tmpFile, data2);
+				blobStore.put(tmpFile, jobId, key);
+			} finally {
+				//noinspection ResultOfMethodCallIgnored
+				tmpFile.delete();
+			}
+
+			// issue a GET request that fails
+			expectedException.expect(IOException.class);
+			expectedException.expectMessage("data corruption");
+
+			get(server, jobId, key, true);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/071e27f7/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
index aad8816..bb977d3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
@@ -30,11 +30,12 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
+import javax.annotation.Nullable;
+
 import java.io.File;
 import java.io.IOException;
-import java.io.InputStream;
-import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.CompletableFuture;
@@ -44,16 +45,14 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
 import static org.apache.flink.runtime.blob.BlobCacheCleanupTest.checkFileCountForJob;
-import static org.apache.flink.runtime.blob.BlobCacheCleanupTest.checkFilesExist;
-import static org.apache.flink.runtime.blob.BlobClientTest.validateGetAndClose;
+import static org.apache.flink.runtime.blob.BlobServerGetTest.verifyDeleted;
+import static org.apache.flink.runtime.blob.BlobServerPutTest.put;
+import static org.apache.flink.runtime.blob.BlobServerPutTest.verifyContents;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 import static org.junit.Assume.assumeTrue;
-import static org.mockito.Mockito.mock;
 
 /**
  * Tests how DELETE requests behave.
@@ -66,315 +65,279 @@ public class BlobServerDeleteTest extends TestLogger {
 	public TemporaryFolder temporaryFolder = new TemporaryFolder();
 
 	@Test
-	public void testDeleteSingleByBlobKey() throws IOException {
-		BlobServer server = null;
-		BlobClient client = null;
-		BlobStore blobStore = new VoidBlobStore();
+	public void testDeleteTransient1() throws IOException {
+		testDeleteTransient(null, new JobID());
+	}
+
+	@Test
+	public void testDeleteTransient2() throws IOException {
+		testDeleteTransient(new JobID(), null);
+	}
+
+	@Test
+	public void testDeleteTransient3() throws IOException {
+		testDeleteTransient(null, null);
+	}
+
+	@Test
+	public void testDeleteTransient4() throws IOException {
+		testDeleteTransient(new JobID(), new JobID());
+	}
+
+	/**
+	 * Uploads a (different) byte array for each of the given jobs and verifies that deleting one of
+	 * them (via the {@link BlobServer}) does not influence the other.
+	 *
+	 * @param jobId1
+	 * 		first job id
+	 * @param jobId2
+	 * 		second job id
+	 */
+	private void testDeleteTransient(@Nullable JobID jobId1, @Nullable JobID jobId2)
+			throws IOException {
 
-		try {
-			final Configuration config = new Configuration();
-			config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
+		final Configuration config = new Configuration();
+		config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
 
-			server = new BlobServer(config, blobStore);
+		try (BlobServer server = new BlobServer(config, new VoidBlobStore())) {
 
-			InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
-			client = new BlobClient(serverAddress, config);
+			server.start();
 
 			byte[] data = new byte[2000000];
 			rnd.nextBytes(data);
+			byte[] data2 = Arrays.copyOf(data, data.length);
+			data2[0] ^= 1;
 
-			// put job-unrelated (like libraries)
-			BlobKey key1 = client.put(null, data);
+			// put first BLOB
+			BlobKey key1 = put(server, jobId1, data, false);
 			assertNotNull(key1);
 
-			// second job-unrelated item
-			data[0] ^= 1;
-			BlobKey key2 = client.put(null, data);
-			assertNotNull(key2);
-			assertNotEquals(key1, key2);
-
-			// put job-related with same key1 as non-job-related
-			data[0] ^= 1; // back to the original data
-			final JobID jobId = new JobID();
-			BlobKey key1b = client.put(jobId, data);
-			assertNotNull(key1b);
-			assertEquals(key1, key1b);
-
-			// issue a DELETE request via the client
-			client.delete(key1);
-			client.close();
-
-			client = new BlobClient(serverAddress, config);
-			try (InputStream ignored = client.get(key1)) {
-				fail("BLOB should have been deleted");
-			}
-			catch (IOException e) {
-				// expected
-			}
+			// put two more BLOBs (same key, other key) for another job ID
+			BlobKey key2a = put(server, jobId2, data, false);
+			assertNotNull(key2a);
+			assertEquals(key1, key2a);
+			BlobKey key2b = put(server, jobId2, data2, false);
+			assertNotNull(key2b);
 
-			ensureClientIsClosed(client);
+			// issue a DELETE request
+			assertTrue(delete(server, jobId1, key1));
 
-			client = new BlobClient(serverAddress, config);
-			try {
-				// NOTE: the server will stall in its send operation until either the data is fully
-				//       read or the socket is closed, e.g. via a client.close() call
-				validateGetAndClose(client.get(jobId, key1), data);
+			verifyDeleted(server, jobId1, key1, false);
+			// deleting a one BLOB should not affect another BLOB, even with the same key if job IDs are different
+			if ((jobId1 == null && jobId2 != null) || (jobId1 != null && !jobId1.equals(jobId2))) {
+				verifyContents(server, jobId2, key2a, data, false);
 			}
-			catch (IOException e) {
-				fail("Deleting a job-unrelated BLOB should not affect a job-related BLOB with the same key");
-			}
-			client.close();
+			verifyContents(server, jobId2, key2b, data2, false);
 
-			// delete a file directly on the server
-			server.delete(key2);
-			try {
-				server.getFile(key2);
-				fail("BLOB should have been deleted");
-			}
-			catch (IOException e) {
-				// expected
-			}
-		}
-		finally {
-			cleanup(server, client);
-		}
-	}
+			// delete first file of second job
+			assertTrue(delete(server, jobId2, key2a));
+			verifyDeleted(server, jobId2, key2a, false);
+			verifyContents(server, jobId2, key2b, data2, false);
 
-	private static void ensureClientIsClosed(final BlobClient client) throws IOException {
-		try {
-			client.put(null, new byte[1]);
-			fail("client should be closed after erroneous operation");
-		}
-		catch (IllegalStateException e) {
-			// expected
-		} finally {
-			client.close();
+			// delete second file of second job
+			assertTrue(delete(server, jobId2, key2a));
+			verifyDeleted(server, jobId2, key2a, false);
+			verifyContents(server, jobId2, key2b, data2, false);
 		}
 	}
 
 	@Test
-	public void testDeleteAlreadyDeletedNoJob() throws IOException {
-		testDeleteAlreadyDeleted(null);
+	public void testDeleteTransientAlreadyDeletedNoJob() throws IOException {
+		testDeleteTransientAlreadyDeleted(null);
 	}
 
 	@Test
-	public void testDeleteAlreadyDeletedForJob() throws IOException {
-		testDeleteAlreadyDeleted(new JobID());
+	public void testDeleteTransientAlreadyDeletedForJob() throws IOException {
+		testDeleteTransientAlreadyDeleted(new JobID());
 	}
 
-	private void testDeleteAlreadyDeleted(final JobID jobId) throws IOException {
-		BlobServer server = null;
-		BlobClient client = null;
-		BlobStore blobStore = new VoidBlobStore();
+	/**
+	 * Uploads a byte array for the given job and verifies that deleting it (via the {@link
+	 * BlobServer}) does not fail independent of whether the file exists.
+	 *
+	 * @param jobId
+	 * 		job id
+	 */
+	private void testDeleteTransientAlreadyDeleted(@Nullable final JobID jobId) throws IOException {
 
-		try {
-			final Configuration config = new Configuration();
-			config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
+		final Configuration config = new Configuration();
+		config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
 
-			server = new BlobServer(config, blobStore);
+		try (BlobServer server = new BlobServer(config, new VoidBlobStore())) {
 
-			InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
-			client = new BlobClient(serverAddress, config);
+			server.start();
 
 			byte[] data = new byte[2000000];
 			rnd.nextBytes(data);
 
-			// put file
-			BlobKey key = client.put(jobId, data);
+			// put BLOB
+			BlobKey key = put(server, jobId, data, false);
 			assertNotNull(key);
 
 			File blobFile = server.getStorageLocation(jobId, key);
 			assertTrue(blobFile.delete());
 
-			// issue a DELETE request via the client
-			try {
-				deleteHelper(client, jobId, key);
-			}
-			catch (IOException e) {
-				fail("DELETE operation should not fail if file is already deleted");
-			}
-
-			// issue a DELETE request on the server
-			if (jobId == null) {
-				server.delete(key);
-			} else {
-				server.delete(jobId, key);
-			}
-		}
-		finally {
-			cleanup(server, client);
-		}
-	}
+			// DELETE operation should not fail if file is already deleted
+			assertTrue(delete(server, jobId, key));
+			verifyDeleted(server, jobId, key, false);
 
-	private static void deleteHelper(BlobClient client, JobID jobId, BlobKey key) throws IOException {
-		if (jobId == null) {
-			client.delete(key);
-		} else {
-			client.delete(jobId, key);
+			// one more delete call that should not fail
+			assertTrue(delete(server, jobId, key));
+			verifyDeleted(server, jobId, key, false);
 		}
 	}
 
 	@Test
-	public void testDeleteFailsNoJob() throws IOException {
-		testDeleteFails(null);
+	public void testDeleteTransientFailsNoJob() throws IOException {
+		testDeleteTransientFails(null);
 	}
 
 	@Test
-	public void testDeleteFailsForJob() throws IOException {
-		testDeleteFails(new JobID());
+	public void testDeleteTransientFailsForJob() throws IOException {
+		testDeleteTransientFails(new JobID());
 	}
 
-	private void testDeleteFails(final JobID jobId) throws IOException {
+	/**
+	 * Uploads a byte array for the given job and verifies that a delete operation (via the {@link
+	 * BlobServer}) does not fail even if the file is not deletable, e.g. via restricting the
+	 * permissions.
+	 *
+	 * @param jobId
+	 * 		job id
+	 */
+	private void testDeleteTransientFails(@Nullable final JobID jobId) throws IOException {
 		assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't work on Windows.
 
-		BlobServer server = null;
-		BlobClient client = null;
-		BlobStore blobStore = new VoidBlobStore();
+		final Configuration config = new Configuration();
+		config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
 
 		File blobFile = null;
 		File directory = null;
-		try {
-			final Configuration config = new Configuration();
-			config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
 
-			server = new BlobServer(config, blobStore);
+		try (BlobServer server = new BlobServer(config, new VoidBlobStore())) {
 
-			InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
-			client = new BlobClient(serverAddress, config);
-
-			byte[] data = new byte[2000000];
-			rnd.nextBytes(data);
+			server.start();
 
-			// put content addressable (like libraries)
-			BlobKey key = client.put(jobId, data);
-			assertNotNull(key);
-
-			blobFile = server.getStorageLocation(jobId, key);
-			directory = blobFile.getParentFile();
-
-			assertTrue(blobFile.setWritable(false, false));
-			assertTrue(directory.setWritable(false, false));
-
-			// issue a DELETE request via the client
-			deleteHelper(client, jobId, key);
-
-			// issue a DELETE request on the server
-			if (jobId == null) {
-				server.delete(key);
-			} else {
-				server.delete(jobId, key);
-			}
-
-			// the file should still be there
-			if (jobId == null) {
-				server.getFile(key);
-			} else {
-				server.getFile(jobId, key);
-			}
-		} finally {
-			if (blobFile != null && directory != null) {
-				//noinspection ResultOfMethodCallIgnored
-				blobFile.setWritable(true, false);
-				//noinspection ResultOfMethodCallIgnored
-				directory.setWritable(true, false);
+			try {
+				byte[] data = new byte[2000000];
+				rnd.nextBytes(data);
+
+				// put BLOB
+				BlobKey key = put(server, jobId, data, false);
+				assertNotNull(key);
+
+				blobFile = server.getStorageLocation(jobId, key);
+				directory = blobFile.getParentFile();
+
+				assertTrue(blobFile.setWritable(false, false));
+				assertTrue(directory.setWritable(false, false));
+
+				// issue a DELETE request
+				assertFalse(delete(server, jobId, key));
+
+				// the file should still be there
+				verifyContents(server, jobId, key, data, false);
+			} finally {
+				if (blobFile != null && directory != null) {
+					//noinspection ResultOfMethodCallIgnored
+					blobFile.setWritable(true, false);
+					//noinspection ResultOfMethodCallIgnored
+					directory.setWritable(true, false);
+				}
 			}
-			cleanup(server, client);
 		}
 	}
 
-	/**
-	 * Tests that {@link BlobServer} cleans up after calling {@link BlobServer#cleanupJob(JobID)}.
-	 */
 	@Test
 	public void testJobCleanup() throws IOException, InterruptedException {
+		testJobCleanup(false);
+	}
+
+	@Test
+	public void testJobCleanupHa() throws IOException, InterruptedException {
+		testJobCleanup(true);
+	}
 
+	/**
+	 * Tests that {@link BlobServer} cleans up after calling {@link BlobServer#cleanupJob(JobID)}.
+	 *
+	 * @param highAvailability
+	 * 		whether to use permanent (<tt>true</tt>) or transient BLOBs (<tt>false</tt>)
+	 */
+	private void testJobCleanup(boolean highAvailability) throws IOException {
 		JobID jobId1 = new JobID();
-		List<BlobKey> keys1 = new ArrayList<BlobKey>();
 		JobID jobId2 = new JobID();
-		List<BlobKey> keys2 = new ArrayList<BlobKey>();
-		BlobServer server = null;
 
-		final byte[] buf = new byte[128];
+		Configuration config = new Configuration();
+		config.setString(BlobServerOptions.STORAGE_DIRECTORY,
+			temporaryFolder.newFolder().getAbsolutePath());
 
-		try {
-			Configuration config = new Configuration();
-			config.setString(BlobServerOptions.STORAGE_DIRECTORY,
-				temporaryFolder.newFolder().getAbsolutePath());
+		try (BlobServer server = new BlobServer(config, new VoidBlobStore())) {
 
-			server = new BlobServer(config, new VoidBlobStore());
-			InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
-			BlobClient bc = new BlobClient(serverAddress, config);
+			server.start();
 
-			keys1.add(bc.put(jobId1, buf));
-			keys2.add(bc.put(jobId2, buf));
-			assertEquals(keys2.get(0), keys1.get(0));
+			final byte[] data = new byte[128];
+			byte[] data2 = Arrays.copyOf(data, data.length);
+			data2[0] ^= 1;
 
-			buf[0] += 1;
-			keys1.add(bc.put(jobId1, buf));
+			BlobKey key1a = put(server, jobId1, data, highAvailability);
+			BlobKey key2 = put(server, jobId2, data, highAvailability);
+			assertEquals(key1a, key2);
 
-			bc.close();
+			BlobKey key1b = put(server, jobId1, data2, highAvailability);
 
-			assertEquals(2, checkFilesExist(jobId1, keys1, server, true));
+			verifyContents(server, jobId1, key1a, data, highAvailability);
+			verifyContents(server, jobId1, key1b, data2, highAvailability);
 			checkFileCountForJob(2, jobId1, server);
-			assertEquals(1, checkFilesExist(jobId2, keys2, server, true));
+
+			verifyContents(server, jobId2, key2, data, highAvailability);
 			checkFileCountForJob(1, jobId2, server);
 
 			server.cleanupJob(jobId1);
 
+			verifyDeleted(server, jobId1, key1a, highAvailability);
+			verifyDeleted(server, jobId1, key1b, highAvailability);
 			checkFileCountForJob(0, jobId1, server);
-			assertEquals(1, checkFilesExist(jobId2, keys2, server, true));
+			verifyContents(server, jobId2, key2, data, highAvailability);
 			checkFileCountForJob(1, jobId2, server);
 
 			server.cleanupJob(jobId2);
 
 			checkFileCountForJob(0, jobId1, server);
+			verifyDeleted(server, jobId2, key2, highAvailability);
 			checkFileCountForJob(0, jobId2, server);
 
 			// calling a second time should not fail
 			server.cleanupJob(jobId2);
 		}
-		finally {
-			if (server != null) {
-				server.close();
-			}
-		}
 	}
 
-	/**
-	 * FLINK-6020
-	 *
-	 * Tests that concurrent delete operations don't interfere with each other.
-	 *
-	 * Note: The test checks that there cannot be two threads which have checked whether a given blob file exist
-	 * and then one of them fails deleting it. Without the introduced lock, this situation should rarely happen
-	 * and make this test fail. Thus, if this test should become "unstable", then the delete atomicity is most likely
-	 * broken.
-	 */
 	@Test
 	public void testConcurrentDeleteOperationsNoJob() throws IOException, ExecutionException, InterruptedException {
 		testConcurrentDeleteOperations(null);
 	}
 
-	/**
-	 * FLINK-6020
-	 *
-	 * Tests that concurrent delete operations don't interfere with each other.
-	 *
-	 * Note: The test checks that there cannot be two threads which have checked whether a given blob file exist
-	 * and then one of them fails deleting it. Without the introduced lock, this situation should rarely happen
-	 * and make this test fail. Thus, if this test should become "unstable", then the delete atomicity is most likely
-	 * broken.
-	 */
 	@Test
 	public void testConcurrentDeleteOperationsForJob() throws IOException, ExecutionException, InterruptedException {
 		testConcurrentDeleteOperations(new JobID());
 	}
 
-	private void testConcurrentDeleteOperations(final JobID jobId)
-		throws IOException, InterruptedException, ExecutionException {
+	/**
+	 * [FLINK-6020] Tests that concurrent delete operations don't interfere with each other.
+	 *
+	 * <p>Note: This test checks that there cannot be two threads which have checked whether a given
+	 * blob file exist and then one of them fails deleting it. Without the introduced lock, this
+	 * situation should rarely happen and make this test fail. Thus, if this test should become
+	 * "unstable", then the delete atomicity is most likely broken.
+	 *
+	 * @param jobId
+	 * 		job ID to use (or <tt>null</tt> if job-unrelated)
+	 */
+	private void testConcurrentDeleteOperations(@Nullable final JobID jobId)
+			throws IOException, InterruptedException, ExecutionException {
+
 		final Configuration config = new Configuration();
 		config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
-		final BlobStore blobStore = mock(BlobStore.class);
 
 		final int concurrentDeleteOperations = 3;
 		final ExecutorService executor = Executors.newFixedThreadPool(concurrentDeleteOperations);
@@ -383,28 +346,27 @@ public class BlobServerDeleteTest extends TestLogger {
 
 		final byte[] data = {1, 2, 3};
 
-		try (final BlobServer blobServer = new BlobServer(config, blobStore)) {
+		try (final BlobServer server = new BlobServer(config, new VoidBlobStore())) {
 
-			final BlobKey blobKey;
+			server.start();
 
-			try (BlobClient client = blobServer.createClient()) {
-				blobKey = client.put(jobId, data);
-			}
+			final BlobKey blobKey = put(server, jobId, data, false);
 
-			assertTrue(blobServer.getStorageLocation(jobId, blobKey).exists());
+			assertTrue(server.getStorageLocation(jobId, blobKey).exists());
 
 			for (int i = 0; i < concurrentDeleteOperations; i++) {
 				CompletableFuture<Void> deleteFuture = CompletableFuture.supplyAsync(
 					() -> {
-						try (BlobClient blobClient = blobServer.createClient()) {
-							deleteHelper(blobClient, jobId, blobKey);
+						try {
+							assertTrue(delete(server, jobId, blobKey));
+							assertFalse(server.getStorageLocation(jobId, blobKey).exists());
+							return null;
 						} catch (IOException e) {
-							throw new CompletionException(new FlinkException("Could not delete the given blob key " + blobKey + '.', e));
+							throw new CompletionException(new FlinkException(
+								"Could not delete the given blob key " + blobKey + '.'));
 						}
-
-						return null;
 					},
-					executor);
+				executor);
 
 				deleteFutures.add(deleteFuture);
 			}
@@ -415,26 +377,30 @@ public class BlobServerDeleteTest extends TestLogger {
 			// in case of no lock, one of the delete operations should eventually fail
 			waitFuture.get();
 
-			assertFalse(blobServer.getStorageLocation(jobId, blobKey).exists());
+			assertFalse(server.getStorageLocation(jobId, blobKey).exists());
+
 		} finally {
 			executor.shutdownNow();
 		}
 	}
 
-	private static void cleanup(BlobServer server, BlobClient client) {
-		if (client != null) {
-			try {
-				client.close();
-			} catch (Throwable t) {
-				t.printStackTrace();
-			}
-		}
-		if (server != null) {
-			try {
-				server.close();
-			} catch (IOException e) {
-				e.printStackTrace();
-			}
+	/**
+	 * Deletes a transient BLOB from the given BLOB service.
+	 *
+	 * @param service
+	 * 		blob service
+	 * @param jobId
+	 * 		job ID (or <tt>null</tt> if job-unrelated)
+	 * @param key
+	 * 		blob key
+	 *
+	 * @return delete success
+	 */
+	static boolean delete(BlobService service, @Nullable JobID jobId, BlobKey key) {
+		if (jobId == null) {
+			return service.getTransientBlobStore().delete(key);
+		} else {
+			return service.getTransientBlobStore().delete(jobId, key);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/071e27f7/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
index d3d3484..0873aba 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
@@ -21,27 +21,33 @@ package org.apache.flink.runtime.blob;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.OperatingSystem;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.IOUtils;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 import org.junit.rules.TemporaryFolder;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
+import javax.annotation.Nullable;
+
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
-import java.io.InputStream;
-import java.net.InetSocketAddress;
+import java.nio.file.AccessDeniedException;
+import java.nio.file.NoSuchFileException;
 import java.security.MessageDigest;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.CompletableFuture;
@@ -51,22 +57,23 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
 import static org.apache.flink.runtime.blob.BlobClientTest.validateGetAndClose;
+import static org.apache.flink.runtime.blob.BlobServerPutTest.put;
+import static org.apache.flink.runtime.blob.BlobUtils.JOB_DIR_PREFIX;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeTrue;
 import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
 
 /**
- * Tests how failing GET requests behave in the presence of failures.
- * Successful GET requests are tested in conjunction wit the PUT
- * requests.
+ * Tests how failing GET requests behave in the presence of failures when used with a {@link
+ * BlobServer}.
+ *
+ * <p>Successful GET requests are tested in conjunction wit the PUT requests.
  */
 public class BlobServerGetTest extends TestLogger {
 
@@ -75,19 +82,27 @@ public class BlobServerGetTest extends TestLogger {
 	@Rule
 	public TemporaryFolder temporaryFolder = new TemporaryFolder();
 
+	@Rule
+	public final ExpectedException exception = ExpectedException.none();
+
 	@Test
 	public void testGetFailsDuringLookup1() throws IOException {
-		testGetFailsDuringLookup(null, new JobID());
+		testGetFailsDuringLookup(null, new JobID(), false);
 	}
 
 	@Test
 	public void testGetFailsDuringLookup2() throws IOException {
-		testGetFailsDuringLookup(new JobID(), new JobID());
+		testGetFailsDuringLookup(new JobID(), new JobID(), false);
 	}
 
 	@Test
 	public void testGetFailsDuringLookup3() throws IOException {
-		testGetFailsDuringLookup(new JobID(), null);
+		testGetFailsDuringLookup(new JobID(), null, false);
+	}
+
+	@Test
+	public void testGetFailsDuringLookupHa() throws IOException {
+		testGetFailsDuringLookup(new JobID(), new JobID(), true);
 	}
 
 	/**
@@ -96,25 +111,21 @@ public class BlobServerGetTest extends TestLogger {
 	 * @param jobId1 first job ID or <tt>null</tt> if job-unrelated
 	 * @param jobId2 second job ID different to <tt>jobId1</tt>
 	 */
-	private void testGetFailsDuringLookup(final JobID jobId1, final JobID jobId2)
-		throws IOException {
-		BlobServer server = null;
-		BlobClient client = null;
-
-		try {
-			final Configuration config = new Configuration();
-			config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
+	private void testGetFailsDuringLookup(
+			@Nullable final JobID jobId1, @Nullable final JobID jobId2, boolean highAvailability)
+			throws IOException {
+		final Configuration config = new Configuration();
+		config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
 
-			server = new BlobServer(config, new VoidBlobStore());
+		try (BlobServer server = new BlobServer(config, new VoidBlobStore())) {
 
-			InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
-			client = new BlobClient(serverAddress, config);
+			server.start();
 
 			byte[] data = new byte[2000000];
 			rnd.nextBytes(data);
 
 			// put content addressable (like libraries)
-			BlobKey key = client.put(jobId1, data);
+			BlobKey key = put(server, jobId1, data, highAvailability);
 			assertNotNull(key);
 
 			// delete file to make sure that GET requests fail
@@ -122,148 +133,233 @@ public class BlobServerGetTest extends TestLogger {
 			assertTrue(blobFile.delete());
 
 			// issue a GET request that fails
-			client = verifyDeleted(client, jobId1, key, serverAddress, config);
+			verifyDeleted(server, jobId1, key, highAvailability);
 
-			BlobKey key2 = client.put(jobId2, data);
+			// add the same data under a second jobId
+			BlobKey key2 = put(server, jobId2, data, highAvailability);
 			assertNotNull(key);
 			assertEquals(key, key2);
+
 			// request for jobId2 should succeed
-			validateGetAndClose(getFileHelper(client, jobId2, key), data);
+			get(server, jobId2, key, highAvailability);
 			// request for jobId1 should still fail
-			client = verifyDeleted(client, jobId1, key, serverAddress, config);
+			verifyDeleted(server, jobId1, key, highAvailability);
 
 			// same checks as for jobId1 but for jobId2 should also work:
 			blobFile = server.getStorageLocation(jobId2, key);
 			assertTrue(blobFile.delete());
-			client = verifyDeleted(client, jobId2, key, serverAddress, config);
-
-		} finally {
-			if (client != null) {
-				client.close();
-			}
-			if (server != null) {
-				server.close();
-			}
+			verifyDeleted(server, jobId2, key, highAvailability);
 		}
 	}
 
 	/**
-	 * Checks that the given blob does not exist anymore.
-	 *
-	 * @param client
-	 * 		BLOB client to use for connecting to the BLOB server
-	 * @param jobId
-	 * 		job ID or <tt>null</tt> if job-unrelated
-	 * @param key
-	 * 		key identifying the BLOB to request
-	 * @param serverAddress
-	 * 		BLOB server address
-	 * @param config
-	 * 		client config
-	 *
-	 * @return a new client (since the old one is being closed on failure)
+	 * Retrieves a BLOB from the HA store to a {@link BlobServer} which cannot create incoming
+	 * files. File transfers should fail.
 	 */
-	private static BlobClient verifyDeleted(
-			BlobClient client, JobID jobId, BlobKey key,
-			InetSocketAddress serverAddress, Configuration config) throws IOException {
-		try (InputStream ignored = getFileHelper(client, jobId, key)) {
-			fail("This should not succeed.");
-		} catch (IOException e) {
-			// expected
-		}
-		// need a new client (old ony closed due to failure
-		return new BlobClient(serverAddress, config);
-	}
-
 	@Test
-	public void testGetFailsDuringStreamingNoJob() throws IOException {
-		testGetFailsDuringStreaming(null);
-	}
+	public void testGetFailsIncomingForJobHa() throws IOException {
+		assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't work on Windows.
 
-	@Test
-	public void testGetFailsDuringStreamingForJob() throws IOException {
-		testGetFailsDuringStreaming(new JobID());
+		final JobID jobId = new JobID();
+
+		final Configuration config = new Configuration();
+		config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
+		config.setString(CoreOptions.STATE_BACKEND, "FILESYSTEM");
+		config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
+		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().getPath());
+
+		BlobStoreService blobStore = null;
+
+		try {
+			blobStore = BlobUtils.createBlobStoreFromConfig(config);
+
+			File tempFileDir = null;
+			try (BlobServer server = new BlobServer(config, blobStore)) {
+
+				server.start();
+
+				// store the data on the server (and blobStore), remove from local store
+				byte[] data = new byte[2000000];
+				rnd.nextBytes(data);
+				BlobKey blobKey = put(server, jobId, data, true);
+				assertTrue(server.getStorageLocation(jobId, blobKey).delete());
+
+				// make sure the blob server cannot create any files in its storage dir
+				tempFileDir = server.createTemporaryFilename().getParentFile();
+				assertTrue(tempFileDir.setExecutable(true, false));
+				assertTrue(tempFileDir.setReadable(true, false));
+				assertTrue(tempFileDir.setWritable(false, false));
+
+				// request the file from the BlobStore
+				exception.expect(IOException.class);
+				exception.expectMessage("Permission denied");
+
+				try {
+					get(server, jobId, blobKey, true);
+				} finally {
+					HashSet<String> expectedDirs = new HashSet<>();
+					expectedDirs.add("incoming");
+					expectedDirs.add(JOB_DIR_PREFIX + jobId);
+					// only the incoming and job directory should exist (no job directory!)
+					File storageDir = tempFileDir.getParentFile();
+					String[] actualDirs = storageDir.list();
+					assertNotNull(actualDirs);
+					assertEquals(expectedDirs, new HashSet<>(Arrays.asList(actualDirs)));
+
+					// job directory should be empty
+					File jobDir = new File(tempFileDir.getParentFile(), JOB_DIR_PREFIX + jobId);
+					assertArrayEquals(new String[] {}, jobDir.list());
+				}
+			} finally {
+				// set writable again to make sure we can remove the directory
+				if (tempFileDir != null) {
+					//noinspection ResultOfMethodCallIgnored
+					tempFileDir.setWritable(true, false);
+				}
+			}
+		} finally {
+			if (blobStore != null) {
+				blobStore.closeAndCleanupAllData();
+			}
+		}
 	}
 
 	/**
-	 * Checks the correct result if a GET operation fails during the file download.
-	 *
-	 * @param jobId job ID or <tt>null</tt> if job-unrelated
+	 * Retrieves a BLOB from the HA store to a {@link BlobServer} which cannot create the final
+	 * storage file. File transfers should fail.
 	 */
-	private void testGetFailsDuringStreaming(final JobID jobId) throws IOException {
-		BlobServer server = null;
-		BlobClient client = null;
+	@Test
+	public void testGetFailsStoreForJobHa() throws IOException {
+		assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't work on Windows.
 
-		try {
-			final Configuration config = new Configuration();
-			config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
+		final JobID jobId = new JobID();
+
+		final Configuration config = new Configuration();
+		config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
+		config.setString(CoreOptions.STATE_BACKEND, "FILESYSTEM");
+		config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
+		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().getPath());
 
-			server = new BlobServer(config, new VoidBlobStore());
+		BlobStoreService blobStore = null;
 
-			InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
-			client = new BlobClient(serverAddress, config);
+		try {
+			blobStore = BlobUtils.createBlobStoreFromConfig(config);
 
-			byte[] data = new byte[5000000];
-			rnd.nextBytes(data);
+			File jobStoreDir = null;
+			try (BlobServer server = new BlobServer(config, blobStore)) {
 
-			// put content addressable (like libraries)
-			BlobKey key = client.put(jobId, data);
-			assertNotNull(key);
+				server.start();
 
-			// issue a GET request that succeeds
-			InputStream is = getFileHelper(client, jobId, key);
+				// store the data on the server (and blobStore), remove from local store
+				byte[] data = new byte[2000000];
+				rnd.nextBytes(data);
+				BlobKey blobKey = put(server, jobId, data, true);
+				assertTrue(server.getStorageLocation(jobId, blobKey).delete());
 
-			byte[] receiveBuffer = new byte[data.length];
-			int firstChunkLen = 50000;
-			BlobUtils.readFully(is, receiveBuffer, 0, firstChunkLen, null);
-			BlobUtils.readFully(is, receiveBuffer, firstChunkLen, firstChunkLen, null);
+				// make sure the blob cache cannot create any files in its storage dir
+				jobStoreDir = server.getStorageLocation(jobId, blobKey).getParentFile();
+				assertTrue(jobStoreDir.setExecutable(true, false));
+				assertTrue(jobStoreDir.setReadable(true, false));
+				assertTrue(jobStoreDir.setWritable(false, false));
 
-			// shut down the server
-			for (BlobServerConnection conn : server.getCurrentActiveConnections()) {
-				conn.close();
-			}
+				// request the file from the BlobStore
+				exception.expect(AccessDeniedException.class);
 
-			try {
-				BlobUtils.readFully(is, receiveBuffer, 2 * firstChunkLen, data.length - 2 * firstChunkLen, null);
-				// we tolerate that this succeeds, as the receiver socket may have buffered
-				// everything already, but in this case, also verify the contents
-				assertArrayEquals(data, receiveBuffer);
-			}
-			catch (IOException e) {
-				// expected
+				try {
+					get(server, jobId, blobKey, true);
+				} finally {
+					// there should be no remaining incoming files
+					File incomingFileDir = new File(jobStoreDir.getParent(), "incoming");
+					assertArrayEquals(new String[] {}, incomingFileDir.list());
+
+					// there should be no files in the job directory
+					assertArrayEquals(new String[] {}, jobStoreDir.list());
+				}
+			} finally {
+				// set writable again to make sure we can remove the directory
+				if (jobStoreDir != null) {
+					//noinspection ResultOfMethodCallIgnored
+					jobStoreDir.setWritable(true, false);
+				}
 			}
-			is.close();
 		} finally {
-			if (client != null) {
-				client.close();
-			}
-			if (server != null) {
-				server.close();
+			if (blobStore != null) {
+				blobStore.closeAndCleanupAllData();
 			}
 		}
 	}
 
 	/**
-	 * FLINK-6020
-	 *
-	 * Tests that concurrent get operations don't concurrently access the BlobStore to download a blob.
+	 * Retrieves a BLOB from the HA store to a {@link BlobServer} whose HA store does not contain
+	 * the file. File transfers should fail.
 	 */
 	@Test
+	public void testGetFailsHaStoreForJobHa() throws IOException {
+		final JobID jobId = new JobID();
+
+		final Configuration config = new Configuration();
+		config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
+
+		try (BlobServer server = new BlobServer(config, new VoidBlobStore())) {
+
+			server.start();
+
+			// store the data on the server (and blobStore), remove from local store
+			byte[] data = new byte[2000000];
+			rnd.nextBytes(data);
+			BlobKey blobKey = put(server, jobId, data, true);
+			assertTrue(server.getStorageLocation(jobId, blobKey).delete());
+
+			File tempFileDir = server.createTemporaryFilename().getParentFile();
+
+			// request the file from the BlobStore
+			exception.expect(NoSuchFileException.class);
+
+			try {
+				get(server, jobId, blobKey, true);
+			} finally {
+				HashSet<String> expectedDirs = new HashSet<>();
+				expectedDirs.add("incoming");
+				expectedDirs.add(JOB_DIR_PREFIX + jobId);
+				// only the incoming and job directory should exist (no job directory!)
+				File storageDir = tempFileDir.getParentFile();
+				String[] actualDirs = storageDir.list();
+				assertNotNull(actualDirs);
+				assertEquals(expectedDirs, new HashSet<>(Arrays.asList(actualDirs)));
+
+				// job directory should be empty
+				File jobDir = new File(tempFileDir.getParentFile(), JOB_DIR_PREFIX + jobId);
+				assertArrayEquals(new String[] {}, jobDir.list());
+			}
+		}
+	}
+
+	@Test
 	public void testConcurrentGetOperationsNoJob() throws IOException, ExecutionException, InterruptedException {
-		testConcurrentGetOperations(null);
+		testConcurrentGetOperations(null, false);
 	}
 
-	/**
-	 * FLINK-6020
-	 *
-	 * Tests that concurrent get operations don't concurrently access the BlobStore to download a blob.
-	 */
 	@Test
 	public void testConcurrentGetOperationsForJob() throws IOException, ExecutionException, InterruptedException {
-		testConcurrentGetOperations(new JobID());
+		testConcurrentGetOperations(new JobID(), false);
+	}
+
+	@Test
+	public void testConcurrentGetOperationsForJobHa() throws IOException, ExecutionException, InterruptedException {
+		testConcurrentGetOperations(new JobID(), true);
 	}
 
-	private void testConcurrentGetOperations(final JobID jobId)
+	/**
+	 * [FLINK-6020] Tests that concurrent get operations don't concurrently access the BlobStore to
+	 * download a blob.
+	 *
+	 * @param jobId
+	 * 		job ID to use (or <tt>null</tt> if job-unrelated)
+	 * @param highAvailability
+	 * 		whether to use permanent (<tt>true</tt>) or transient BLOBs (<tt>false</tt>)
+	 */
+	private void testConcurrentGetOperations(
+			@Nullable final JobID jobId, final boolean highAvailability)
 			throws IOException, InterruptedException, ExecutionException {
 		final Configuration config = new Configuration();
 		config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
@@ -271,10 +367,9 @@ public class BlobServerGetTest extends TestLogger {
 		final BlobStore blobStore = mock(BlobStore.class);
 
 		final int numberConcurrentGetOperations = 3;
-		final List<CompletableFuture<InputStream>> getOperations = new ArrayList<>(numberConcurrentGetOperations);
+		final List<CompletableFuture<File>> getOperations = new ArrayList<>(numberConcurrentGetOperations);
 
 		final byte[] data = {1, 2, 3, 4, 99, 42};
-		final ByteArrayInputStream bais = new ByteArrayInputStream(data);
 
 		MessageDigest md = BlobUtils.createMessageDigest();
 
@@ -287,7 +382,7 @@ public class BlobServerGetTest extends TestLogger {
 				public Object answer(InvocationOnMock invocation) throws Throwable {
 					File targetFile = (File) invocation.getArguments()[2];
 
-					FileUtils.copyInputStreamToFile(bais, targetFile);
+					FileUtils.writeByteArrayToFile(targetFile, data);
 
 					return null;
 				}
@@ -296,19 +391,29 @@ public class BlobServerGetTest extends TestLogger {
 
 		final ExecutorService executor = Executors.newFixedThreadPool(numberConcurrentGetOperations);
 
-		try (final BlobServer blobServer = new BlobServer(config, blobStore)) {
-			for (int i = 0; i < numberConcurrentGetOperations; i++) {
-				CompletableFuture<InputStream> getOperation = CompletableFuture.supplyAsync(
-					() -> {
-						try (BlobClient blobClient = blobServer.createClient();
-							InputStream inputStream = getFileHelper(blobClient, jobId, blobKey)) {
-							byte[] buffer = new byte[data.length];
+		try (final BlobServer server = new BlobServer(config, blobStore)) {
+
+			server.start();
 
-							IOUtils.readFully(inputStream, buffer);
+			// upload data first
+			assertEquals(blobKey, put(server, jobId, data, highAvailability));
 
-							return new ByteArrayInputStream(buffer);
+			// now try accessing it concurrently (only HA mode will be able to retrieve it from HA store!)
+			if (highAvailability) {
+				// remove local copy so that a transfer from HA store takes place
+				assertTrue(server.getStorageLocation(jobId, blobKey).delete());
+			}
+			for (int i = 0; i < numberConcurrentGetOperations; i++) {
+				CompletableFuture<File> getOperation = CompletableFuture.supplyAsync(
+					() -> {
+						try {
+							File file = get(server, jobId, blobKey, highAvailability);
+							// check that we have read the right data
+							validateGetAndClose(new FileInputStream(file), data);
+							return file;
 						} catch (IOException e) {
-							throw new CompletionException(new FlinkException("Could not read blob for key " + blobKey + '.', e));
+							throw new CompletionException(new FlinkException(
+								"Could not read blob for key " + blobKey + '.', e));
 						}
 					},
 					executor);
@@ -316,37 +421,63 @@ public class BlobServerGetTest extends TestLogger {
 				getOperations.add(getOperation);
 			}
 
-			CompletableFuture<Collection<InputStream>> inputStreamsFuture = FutureUtils.combineAll(getOperations);
-
-			Collection<InputStream> inputStreams = inputStreamsFuture.get();
-
-			// check that we have read the right data
-			for (InputStream inputStream : inputStreams) {
-				ByteArrayOutputStream baos = new ByteArrayOutputStream(data.length);
-
-				IOUtils.copy(inputStream, baos);
-
-				baos.close();
-				byte[] input = baos.toByteArray();
-
-				assertArrayEquals(data, input);
-
-				inputStream.close();
-			}
-
-			// verify that we downloaded the requested blob exactly once from the BlobStore
-			verify(blobStore, times(1)).get(eq(jobId), eq(blobKey), any(File.class));
+			CompletableFuture<Collection<File>> filesFuture = FutureUtils.combineAll(getOperations);
+			filesFuture.get();
 		} finally {
 			executor.shutdownNow();
 		}
 	}
 
-	static InputStream getFileHelper(BlobClient blobClient, JobID jobId, BlobKey blobKey)
-		throws IOException {
-		if (jobId == null) {
-			return blobClient.get(blobKey);
+	/**
+	 * Retrieves the given blob.
+	 *
+	 * <p>Note that if a {@link BlobCache} is used, it may try to access the {@link BlobServer} to
+	 * retrieve the blob.
+	 *
+	 * @param service
+	 * 		BLOB client to use for connecting to the BLOB service
+	 * @param jobId
+	 * 		job ID or <tt>null</tt> if job-unrelated
+	 * @param key
+	 * 		key identifying the BLOB to request
+	 * @param highAvailability
+	 * 		whether to check HA mode accessors
+	 */
+	static File get(
+			BlobService service, @Nullable JobID jobId, BlobKey key, boolean highAvailability)
+			throws IOException {
+		if (highAvailability) {
+			return service.getPermanentBlobStore().getHAFile(jobId, key);
+		} else if (jobId == null) {
+			return service.getTransientBlobStore().getFile(key);
 		} else {
-			return blobClient.get(jobId, blobKey);
+			return service.getTransientBlobStore().getFile(jobId, key);
+		}
+	}
+
+	/**
+	 * Checks that the given blob does not exist anymore by trying to access it.
+	 *
+	 * <p>Note that if a {@link BlobCache} is used, it may try to access the {@link BlobServer} to
+	 * retrieve the blob.
+	 *
+	 * @param service
+	 * 		BLOB client to use for connecting to the BLOB service
+	 * @param jobId
+	 * 		job ID or <tt>null</tt> if job-unrelated
+	 * @param key
+	 * 		key identifying the BLOB to request
+	 * @param highAvailability
+	 * 		whether to check HA mode accessors
+	 */
+	static void verifyDeleted(
+			BlobService service, @Nullable JobID jobId, BlobKey key, boolean highAvailability)
+			throws IOException {
+		try {
+			get(service, jobId, key, highAvailability);
+			fail("File " + jobId + "/" + key + " should have been deleted.");
+		} catch (IOException e) {
+			// expected
 		}
 	}
 }