You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/05/17 06:18:01 UTC

[3/5] flink git commit: [FLINK-6519] Integrate BlobStore in lifecycle management of HighAvailabilityServices

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
index 5054107..c106b3f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -42,7 +43,7 @@ import org.junit.Test;
 /**
  * This class contains unit tests for the {@link BlobClient} with ssl enabled.
  */
-public class BlobClientSslTest {
+public class BlobClientSslTest extends TestLogger {
 
 	/** The buffer size used during the tests in bytes. */
 	private static final int TEST_BUFFER_SIZE = 17 * 1000;
@@ -63,19 +64,14 @@ public class BlobClientSslTest {
 	 * Starts the SSL enabled BLOB server.
 	 */
 	@BeforeClass
-	public static void startSSLServer() {
-		try {
-			Configuration config = new Configuration();
-			config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true);
-			config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore");
-			config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password");
-			config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password");
-			BLOB_SSL_SERVER = new BlobServer(config);
-		}
-		catch (IOException e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
+	public static void startSSLServer() throws IOException {
+		Configuration config = new Configuration();
+		config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true);
+		config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore");
+		config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password");
+		config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password");
+		BLOB_SSL_SERVER = new BlobServer(config, new VoidBlobStore());
+
 
 		sslClientConfig = new Configuration();
 		sslClientConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true);
@@ -87,20 +83,14 @@ public class BlobClientSslTest {
 	 * Starts the SSL disabled BLOB server.
 	 */
 	@BeforeClass
-	public static void startNonSSLServer() {
-		try {
-			Configuration config = new Configuration();
-			config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true);
-			config.setBoolean(ConfigConstants.BLOB_SERVICE_SSL_ENABLED, false);
-			config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore");
-			config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password");
-			config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password");
-			BLOB_SERVER = new BlobServer(config);
-		}
-		catch (IOException e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
+	public static void startNonSSLServer() throws IOException {
+		Configuration config = new Configuration();
+		config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true);
+		config.setBoolean(ConfigConstants.BLOB_SERVICE_SSL_ENABLED, false);
+		config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore");
+		config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password");
+		config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password");
+		BLOB_SERVER = new BlobServer(config, new VoidBlobStore());
 
 		clientConfig = new Configuration();
 		clientConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true);
@@ -113,13 +103,13 @@ public class BlobClientSslTest {
 	 * Shuts the BLOB server down.
 	 */
 	@AfterClass
-	public static void stopServers() {
+	public static void stopServers() throws IOException {
 		if (BLOB_SSL_SERVER != null) {
-			BLOB_SSL_SERVER.shutdown();
+			BLOB_SSL_SERVER.close();
 		}
 
 		if (BLOB_SERVER != null) {
-			BLOB_SERVER.shutdown();
+			BLOB_SERVER.close();
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
index 8f8f8c5..fda4ee9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
@@ -57,24 +57,18 @@ public class BlobClientTest {
 	 * Starts the BLOB server.
 	 */
 	@BeforeClass
-	public static void startServer() {
-		try {
-			blobServiceConfig = new Configuration();
-			BLOB_SERVER = new BlobServer(blobServiceConfig);
-		}
-		catch (IOException e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
+	public static void startServer() throws IOException {
+		blobServiceConfig = new Configuration();
+		BLOB_SERVER = new BlobServer(blobServiceConfig, new VoidBlobStore());
 	}
 
 	/**
 	 * Shuts the BLOB server down.
 	 */
 	@AfterClass
-	public static void stopServer() {
+	public static void stopServer() throws IOException {
 		if (BLOB_SERVER != null) {
-			BLOB_SERVER.shutdown();
+			BLOB_SERVER.close();
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
index f8d50d5..4f12ddb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
@@ -30,16 +30,13 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
-import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Random;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -59,10 +56,20 @@ public class BlobRecoveryITCase extends TestLogger {
 		config.setString(CoreOptions.STATE_BACKEND, "FILESYSTEM");
 		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.getRoot().getPath());
 
-		testBlobServerRecovery(config);
+		BlobStoreService blobStoreService = null;
+
+		try {
+			blobStoreService = BlobUtils.createBlobStoreFromConfig(config);
+
+			testBlobServerRecovery(config, blobStoreService);
+		} finally {
+			if (blobStoreService != null) {
+				blobStoreService.closeAndCleanupAllData();
+			}
+		}
 	}
 
-	public static void testBlobServerRecovery(final Configuration config) throws IOException {
+	public static void testBlobServerRecovery(final Configuration config, final BlobStore blobStore) throws IOException {
 		final String clusterId = config.getString(HighAvailabilityOptions.HA_CLUSTER_ID);
 		String storagePath = config.getString(HighAvailabilityOptions.HA_STORAGE_PATH) + "/" + clusterId;
 		Random rand = new Random();
@@ -73,7 +80,7 @@ public class BlobRecoveryITCase extends TestLogger {
 
 		try {
 			for (int i = 0; i < server.length; i++) {
-				server[i] = new BlobServer(config);
+				server[i] = new BlobServer(config, blobStore);
 				serverAddress[i] = new InetSocketAddress("localhost", server[i].getPort());
 			}
 
@@ -166,7 +173,7 @@ public class BlobRecoveryITCase extends TestLogger {
 		finally {
 			for (BlobServer s : server) {
 				if (s != null) {
-					s.shutdown();
+					s.close();
 				}
 			}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
index 025a2ff..e8e28a1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
@@ -44,10 +44,11 @@ public class BlobServerDeleteTest {
 	public void testDeleteSingle() {
 		BlobServer server = null;
 		BlobClient client = null;
+		BlobStore blobStore = new VoidBlobStore();
 
 		try {
 			Configuration config = new Configuration();
-			server = new BlobServer(config);
+			server = new BlobServer(config, blobStore);
 
 			InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
 			client = new BlobClient(serverAddress, config);
@@ -93,10 +94,11 @@ public class BlobServerDeleteTest {
 	public void testDeleteAll() {
 		BlobServer server = null;
 		BlobClient client = null;
+		BlobStore blobStore = new VoidBlobStore();
 
 		try {
 			Configuration config = new Configuration();
-			server = new BlobServer(config);
+			server = new BlobServer(config, blobStore);
 
 			InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
 			client = new BlobClient(serverAddress, config);
@@ -156,10 +158,11 @@ public class BlobServerDeleteTest {
 	public void testDeleteAlreadyDeletedByBlobKey() {
 		BlobServer server = null;
 		BlobClient client = null;
+		BlobStore blobStore = new VoidBlobStore();
 
 		try {
 			Configuration config = new Configuration();
-			server = new BlobServer(config);
+			server = new BlobServer(config, blobStore);
 
 			InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
 			client = new BlobClient(serverAddress, config);
@@ -195,10 +198,11 @@ public class BlobServerDeleteTest {
 	public void testDeleteAlreadyDeletedByName() {
 		BlobServer server = null;
 		BlobClient client = null;
+		BlobStore blobStore = new VoidBlobStore();
 
 		try {
 			Configuration config = new Configuration();
-			server = new BlobServer(config);
+			server = new BlobServer(config, blobStore);
 
 			InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
 			client = new BlobClient(serverAddress, config);
@@ -237,10 +241,11 @@ public class BlobServerDeleteTest {
 
 		BlobServer server = null;
 		BlobClient client = null;
+		BlobStore blobStore = new VoidBlobStore();
 
 		try {
 			Configuration config = new Configuration();
-			server = new BlobServer(config);
+			server = new BlobServer(config, blobStore);
 
 			InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
 			client = new BlobClient(serverAddress, config);
@@ -289,7 +294,11 @@ public class BlobServerDeleteTest {
 			}
 		}
 		if (server != null) {
-			server.shutdown();
+			try {
+				server.close();
+			} catch (IOException e) {
+				e.printStackTrace();
+			}
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
index 59a62e1..6d1dba8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
@@ -40,13 +40,13 @@ public class BlobServerGetTest {
 	private final Random rnd = new Random();
 
 	@Test
-	public void testGetFailsDuringLookup() {
+	public void testGetFailsDuringLookup() throws IOException {
 		BlobServer server = null;
 		BlobClient client = null;
 
 		try {
 			Configuration config = new Configuration();
-			server = new BlobServer(config);
+			server = new BlobServer(config, new VoidBlobStore());
 
 			InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
 			client = new BlobClient(serverAddress, config);
@@ -66,37 +66,27 @@ public class BlobServerGetTest {
 			try {
 				client.get(key);
 				fail("This should not succeed.");
-			}
-			catch (IOException e) {
+			} catch (IOException e) {
 				// expected
 			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-		finally {
+		} finally {
 			if (client != null) {
-				try {
-					client.close();
-				} catch (Throwable t) {
-					t.printStackTrace();
-				}
+				client.close();
 			}
 			if (server != null) {
-				server.shutdown();
+				server.close();
 			}
 		}
 	}
 
 	@Test
-	public void testGetFailsDuringStreaming() {
+	public void testGetFailsDuringStreaming() throws IOException {
 		BlobServer server = null;
 		BlobClient client = null;
 
 		try {
 			Configuration config = new Configuration();
-			server = new BlobServer(config);
+			server = new BlobServer(config, new VoidBlobStore());
 
 			InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
 			client = new BlobClient(serverAddress, config);
@@ -129,21 +119,12 @@ public class BlobServerGetTest {
 			catch (IOException e) {
 				// expected
 			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-		finally {
+		} finally {
 			if (client != null) {
-				try {
-					client.close();
-				} catch (Throwable t) {
-					t.printStackTrace();
-				}
+				client.close();
 			}
 			if (server != null) {
-				server.shutdown();
+				server.close();
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
index c4d6d1c..441ca7d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
@@ -42,13 +42,13 @@ public class BlobServerPutTest {
 	private final Random rnd = new Random();
 
 	@Test
-	public void testPutBufferSuccessful() {
+	public void testPutBufferSuccessful() throws IOException {
 		BlobServer server = null;
 		BlobClient client = null;
 
 		try {
 			Configuration config = new Configuration();
-			server = new BlobServer(config);
+			server = new BlobServer(config, new VoidBlobStore());
 
 			InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
 			client = new BlobClient(serverAddress, config);
@@ -95,34 +95,25 @@ public class BlobServerPutTest {
 			BlobUtils.readFully(is3, result3, 0, result3.length, null);
 			is3.close();
 			assertArrayEquals(data, result3);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-		finally {
+		} finally {
 			if (client != null) {
-				try {
-					client.close();
-				} catch (Throwable t) {
-					t.printStackTrace();
-				}
+				client.close();
 			}
 			if (server != null) {
-				server.shutdown();
+				server.close();
 			}
 		}
 	}
 
 
 	@Test
-	public void testPutStreamSuccessful() {
+	public void testPutStreamSuccessful() throws IOException {
 		BlobServer server = null;
 		BlobClient client = null;
 
 		try {
 			Configuration config = new Configuration();
-			server = new BlobServer(config);
+			server = new BlobServer(config, new VoidBlobStore());
 
 			InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
 			client = new BlobClient(serverAddress, config);
@@ -143,12 +134,7 @@ public class BlobServerPutTest {
 				String stringKey = "my test key";
 				client.put(jid, stringKey, new ByteArrayInputStream(data));
 			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-		finally {
+		} finally {
 			if (client != null) {
 				try {
 					client.close();
@@ -157,19 +143,19 @@ public class BlobServerPutTest {
 				}
 			}
 			if (server != null) {
-				server.shutdown();
+				server.close();
 			}
 		}
 	}
 
 	@Test
-	public void testPutChunkedStreamSuccessful() {
+	public void testPutChunkedStreamSuccessful() throws IOException {
 		BlobServer server = null;
 		BlobClient client = null;
 
 		try {
 			Configuration config = new Configuration();
-			server = new BlobServer(config);
+			server = new BlobServer(config, new VoidBlobStore());
 
 			InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
 			client = new BlobClient(serverAddress, config);
@@ -190,27 +176,18 @@ public class BlobServerPutTest {
 				String stringKey = "my test key";
 				client.put(jid, stringKey, new ChunkedInputStream(data, 17));
 			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-		finally {
+		} finally {
 			if (client != null) {
-				try {
-					client.close();
-				} catch (Throwable t) {
-					t.printStackTrace();
-				}
+				client.close();
 			}
 			if (server != null) {
-				server.shutdown();
+				server.close();
 			}
 		}
 	}
 
 	@Test
-	public void testPutBufferFails() {
+	public void testPutBufferFails() throws IOException {
 		assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't work on Windows.
 
 		BlobServer server = null;
@@ -219,7 +196,7 @@ public class BlobServerPutTest {
 		File tempFileDir = null;
 		try {
 			Configuration config = new Configuration();
-			server = new BlobServer(config);
+			server = new BlobServer(config, new VoidBlobStore());
 
 			// make sure the blob server cannot create any files in its storage dir
 			tempFileDir = server.createTemporaryFilename().getParentFile().getParentFile();
@@ -250,31 +227,22 @@ public class BlobServerPutTest {
 				// expected
 			}
 
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-		finally {
+		} finally {
 			// set writable again to make sure we can remove the directory
 			if (tempFileDir != null) {
 				tempFileDir.setWritable(true, false);
 			}
 			if (client != null) {
-				try {
-					client.close();
-				} catch (Throwable t) {
-					t.printStackTrace();
-				}
+				client.close();
 			}
 			if (server != null) {
-				server.shutdown();
+				server.close();
 			}
 		}
 	}
 
 	@Test
-	public void testPutNamedBufferFails() {
+	public void testPutNamedBufferFails() throws IOException {
 		assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't work on Windows.
 
 		BlobServer server = null;
@@ -283,7 +251,7 @@ public class BlobServerPutTest {
 		File tempFileDir = null;
 		try {
 			Configuration config = new Configuration();
-			server = new BlobServer(config);
+			server = new BlobServer(config, new VoidBlobStore());
 
 			// make sure the blob server cannot create any files in its storage dir
 			tempFileDir = server.createTemporaryFilename().getParentFile().getParentFile();
@@ -317,25 +285,16 @@ public class BlobServerPutTest {
 			catch (IllegalStateException e) {
 				// expected
 			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-		finally {
+		} finally {
 			// set writable again to make sure we can remove the directory
 			if (tempFileDir != null) {
 				tempFileDir.setWritable(true, false);
 			}
 			if (client != null) {
-				try {
-					client.close();
-				} catch (Throwable t) {
-					t.printStackTrace();
-				}
+				client.close();
 			}
 			if (server != null) {
-				server.shutdown();
+				server.close();
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java
index ea0eb94..fbcd4a1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java
@@ -39,8 +39,8 @@ public class BlobServerRangeTest extends TestLogger {
 	public void testOnEphemeralPort() throws IOException {
 		Configuration conf = new Configuration();
 		conf.setString(ConfigConstants.BLOB_SERVER_PORT, "0");
-		BlobServer srv = new BlobServer(conf);
-		srv.shutdown();
+		BlobServer srv = new BlobServer(conf, new VoidBlobStore());
+		srv.close();
 	}
 
 	/**
@@ -63,7 +63,7 @@ public class BlobServerRangeTest extends TestLogger {
 
 		// this thing is going to throw an exception
 		try {
-			BlobServer srv = new BlobServer(conf);
+			BlobServer srv = new BlobServer(conf, new VoidBlobStore());
 		} finally {
 			socket.close();
 		}
@@ -92,9 +92,9 @@ public class BlobServerRangeTest extends TestLogger {
 
 		// this thing is going to throw an exception
 		try {
-			BlobServer srv = new BlobServer(conf);
+			BlobServer srv = new BlobServer(conf, new VoidBlobStore());
 			Assert.assertEquals(availablePort, srv.getPort());
-			srv.shutdown();
+			srv.close();
 		} finally {
 			sockets[0].close();
 			sockets[1].close();

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingFailingBlobServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingFailingBlobServer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingFailingBlobServer.java
index 93f9b73..91e119b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingFailingBlobServer.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingFailingBlobServer.java
@@ -28,8 +28,8 @@ public class TestingFailingBlobServer extends BlobServer {
 
 	private int numFailures;
 
-	public TestingFailingBlobServer(Configuration config, int numFailures) throws IOException {
-		super(config);
+	public TestingFailingBlobServer(Configuration config, BlobStore blobStore, int numFailures) throws IOException {
+		super(config, blobStore);
 		this.numFailures = numFailures;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
index 5d9ade3..98e6b3e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
@@ -18,13 +18,12 @@
 
 package org.apache.flink.runtime.execution.librarycache;
 
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.blob.BlobClient;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.util.OperatingSystem;
@@ -45,7 +44,7 @@ import java.util.List;
 public class BlobLibraryCacheManagerTest {
 
 	@Test
-	public void testLibraryCacheManagerCleanup() {
+	public void testLibraryCacheManagerCleanup() throws IOException, InterruptedException {
 
 		JobID jid = new JobID();
 		List<BlobKey> keys = new ArrayList<BlobKey>();
@@ -56,7 +55,7 @@ public class BlobLibraryCacheManagerTest {
 
 		try {
 			Configuration config = new Configuration();
-			server = new BlobServer(config);
+			server = new BlobServer(config, new VoidBlobStore());
 			InetSocketAddress blobSocketAddress = new InetSocketAddress(server.getPort());
 			BlobClient bc = new BlobClient(blobSocketAddress, config);
 
@@ -108,14 +107,9 @@ public class BlobLibraryCacheManagerTest {
 			assertEquals(2, caughtExceptions);
 
 			bc.close();
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-		finally {
+		} finally {
 			if (server != null) {
-				server.shutdown();
+				server.close();
 			}
 
 			if (libraryCacheManager != null) {
@@ -130,7 +124,7 @@ public class BlobLibraryCacheManagerTest {
 	}
 
 	@Test
-	public void testRegisterAndDownload() {
+	public void testRegisterAndDownload() throws IOException {
 		assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't work on Windows.
 
 		BlobServer server = null;
@@ -139,9 +133,9 @@ public class BlobLibraryCacheManagerTest {
 		try {
 			// create the blob transfer services
 			Configuration config = new Configuration();
-			server = new BlobServer(config);
+			server = new BlobServer(config, new VoidBlobStore());
 			InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
-			cache = new BlobCache(serverAddress, config);
+			cache = new BlobCache(serverAddress, config, new VoidBlobStore());
 
 			// upload some meaningless data to the server
 			BlobClient uploader = new BlobClient(serverAddress, config);
@@ -210,22 +204,17 @@ public class BlobLibraryCacheManagerTest {
 			catch (IOException e) {
 				// splendid!
 			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-		finally {
+		} finally {
 			if (cacheDir != null) {
 				if (!cacheDir.setWritable(true, false)) {
 					System.err.println("Could not re-add write permissions to cache directory.");
 				}
 			}
 			if (cache != null) {
-				cache.shutdown();
+				cache.close();
 			}
 			if (server != null) {
-				server.shutdown();
+				server.close();
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
index 54e1a9b..16e3a05 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
@@ -26,6 +26,8 @@ import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.blob.BlobClient;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.BlobStoreService;
+import org.apache.flink.runtime.blob.BlobUtils;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.util.TestLogger;
@@ -63,6 +65,7 @@ public class BlobLibraryCacheRecoveryITCase extends TestLogger {
 		BlobLibraryCacheManager[] libServer = new BlobLibraryCacheManager[2];
 		BlobCache cache = null;
 		BlobLibraryCacheManager libCache = null;
+		BlobStoreService blobStoreService = null;
 
 		Configuration config = new Configuration();
 		config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
@@ -70,8 +73,10 @@ public class BlobLibraryCacheRecoveryITCase extends TestLogger {
 		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.getRoot().getAbsolutePath());
 
 		try {
+			blobStoreService = BlobUtils.createBlobStoreFromConfig(config);
+
 			for (int i = 0; i < server.length; i++) {
-				server[i] = new BlobServer(config);
+				server[i] = new BlobServer(config, blobStoreService);
 				serverAddress[i] = new InetSocketAddress("localhost", server[i].getPort());
 				libServer[i] = new BlobLibraryCacheManager(server[i], 3600 * 1000);
 			}
@@ -89,7 +94,7 @@ public class BlobLibraryCacheRecoveryITCase extends TestLogger {
 			}
 
 			// The cache
-			cache = new BlobCache(serverAddress[0], config);
+			cache = new BlobCache(serverAddress[0], config, blobStoreService);
 			libCache = new BlobLibraryCacheManager(cache, 3600 * 1000);
 
 			// Register uploaded libraries
@@ -110,10 +115,10 @@ public class BlobLibraryCacheRecoveryITCase extends TestLogger {
 			}
 
 			// Shutdown cache and start with other server
-			cache.shutdown();
+			cache.close();
 			libCache.shutdown();
 
-			cache = new BlobCache(serverAddress[1], config);
+			cache = new BlobCache(serverAddress[1], config, blobStoreService);
 			libCache = new BlobLibraryCacheManager(cache, 3600 * 1000);
 
 			// Verify key 1
@@ -156,17 +161,21 @@ public class BlobLibraryCacheRecoveryITCase extends TestLogger {
 		finally {
 			for (BlobServer s : server) {
 				if (s != null) {
-					s.shutdown();
+					s.close();
 				}
 			}
 
 			if (cache != null) {
-				cache.shutdown();
+				cache.close();
 			}
 
 			if (libCache != null) {
 				libCache.shutdown();
 			}
+
+			if (blobStoreService != null) {
+				blobStoreService.closeAndCleanupAllData();
+			}
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRegistryTest.java
index 06ffe3c..d89093d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRegistryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRegistryTest.java
@@ -22,11 +22,11 @@ import org.apache.curator.test.TestingServer;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
 import org.apache.flink.runtime.highavailability.RunningJobsRegistry.JobSchedulingStatus;
-import org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.util.TestLogger;
 
@@ -62,7 +62,10 @@ public class ZooKeeperRegistryTest extends TestLogger {
 		configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
 
 		final HighAvailabilityServices zkHaService = new ZooKeeperHaServices(
-				ZooKeeperUtils.startCuratorFramework(configuration), Executors.directExecutor(), configuration);
+				ZooKeeperUtils.startCuratorFramework(configuration),
+			Executors.directExecutor(),
+			configuration,
+			new VoidBlobStore());
 
 		final RunningJobsRegistry zkRegistry = zkHaService.getRunningJobsRegistry();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index b8b5984..a63b02d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -43,7 +43,6 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
-import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
@@ -71,7 +70,6 @@ import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.state.ChainedStateHandle;
-import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.TaskStateHandles;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
@@ -102,7 +100,6 @@ import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 import scala.runtime.BoxedUnit;
 
-import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -190,7 +187,11 @@ public class JobManagerHARecoveryTest extends TestLogger {
 				TestingUtils.defaultExecutor(),
 				instanceManager,
 				scheduler,
-				new BlobLibraryCacheManager(new BlobServer(flinkConfiguration), 3600000),
+				new BlobLibraryCacheManager(
+					new BlobServer(
+						flinkConfiguration,
+						testingHighAvailabilityServices.createBlobStore()),
+					3600000L),
 				archive,
 				new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(Int.MaxValue(), 100),
 				timeout,

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
index d6257ba..70800e5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
@@ -31,6 +31,7 @@ import org.apache.curator.test.TestingServer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
@@ -184,7 +185,7 @@ public class JobManagerLeaderElectionTest extends TestLogger {
 			TestingUtils.defaultExecutor(),
 			new InstanceManager(),
 			new Scheduler(TestingUtils.defaultExecutionContext()),
-			new BlobLibraryCacheManager(new BlobServer(configuration), 10L),
+			new BlobLibraryCacheManager(new BlobServer(configuration, new VoidBlobStore()), 10L),
 			ActorRef.noSender(),
 			new NoRestartStrategy.NoRestartStrategyFactory(),
 			AkkaUtils.getDefaultTimeoutAsFiniteDuration(),

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
index 0ea47f2..0282a4f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
@@ -18,17 +18,21 @@
 
 package org.apache.flink.runtime.leaderelection;
 
+import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.test.TestingServer;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices;
 import org.apache.flink.runtime.jobmaster.JobMaster;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.After;
@@ -64,10 +68,13 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{
 		config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
 		config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString());
 
-		highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
-			config,
+		CuratorFramework client = ZooKeeperUtils.startCuratorFramework(config);
+
+		highAvailabilityServices = new ZooKeeperHaServices(
+			client,
 			TestingUtils.defaultExecutor(),
-			HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
+			config,
+			new VoidBlobStore());
 	}
 
 	@After

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
index 58f2231..d6fc48c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
@@ -97,7 +97,7 @@ public class TaskManagerMetricsTest extends TestLogger {
 				taskManagerServices.getMemoryManager(),
 				taskManagerServices.getIOManager(),
 				taskManagerServices.getNetworkEnvironment(),
-				highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
+				highAvailabilityServices,
 				tmRegistry);
 
 			final ActorRef taskManager = actorSystem.actorOf(tmProps);

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index 2a4c036..9dcfc70 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -149,9 +149,6 @@ public class TaskManagerComponentsStartupShutdownTest extends TestLogger {
 
 			network.start();
 
-			LeaderRetrievalService leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(
-				HighAvailabilityServices.DEFAULT_JOB_ID);
-
 			MetricRegistryConfiguration metricRegistryConfiguration = MetricRegistryConfiguration.fromConfiguration(config);
 
 			// create the task manager
@@ -164,7 +161,7 @@ public class TaskManagerComponentsStartupShutdownTest extends TestLogger {
 				ioManager,
 				network,
 				numberOfSlots,
-				leaderRetrievalService,
+				highAvailabilityServices,
 				new MetricRegistry(metricRegistryConfiguration));
 
 			taskManager = actorSystem.actorOf(tmProps);

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
index 92de31a..0844aad 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
@@ -26,6 +26,7 @@ import akka.testkit.JavaTestKit;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
 import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager;
 import org.apache.flink.runtime.concurrent.Executors;
@@ -57,6 +58,7 @@ import scala.concurrent.Future;
 import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
@@ -601,7 +603,7 @@ public class TaskManagerRegistrationTest extends TestLogger {
 	}
 
 	@Test
-	public void testCheckForValidRegistrationSessionIDs() {
+	public void testCheckForValidRegistrationSessionIDs() throws IOException {
 		new JavaTestKit(actorSystem) {{
 
 			ActorGateway taskManagerGateway = null;
@@ -612,6 +614,7 @@ public class TaskManagerRegistrationTest extends TestLogger {
 			HighAvailabilityServices mockedHighAvailabilityServices = mock(HighAvailabilityServices.class);
 			when(mockedHighAvailabilityServices.getJobManagerLeaderRetriever(Matchers.eq(HighAvailabilityServices.DEFAULT_JOB_ID)))
 				.thenReturn(new StandaloneLeaderRetrievalService(getTestActor().path().toString(), trueLeaderSessionID));
+			when(mockedHighAvailabilityServices.createBlobStore()).thenReturn(new VoidBlobStore());
 
 			try {
 				// we make the test actor (the test kit) the JobManager to intercept

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/resources/log4j-test.properties b/flink-runtime/src/test/resources/log4j-test.properties
index 7ba1633..98f136a 100644
--- a/flink-runtime/src/test/resources/log4j-test.properties
+++ b/flink-runtime/src/test/resources/log4j-test.properties
@@ -16,7 +16,7 @@
 # limitations under the License.
 ################################################################################
 
-log4j.rootLogger=OFF, console
+log4j.rootLogger=INFO, console
 
 # -----------------------------------------------------------------------------
 # Console (use 'console')

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
index 4be3299..1b9ee48 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
@@ -249,7 +249,8 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll with Befor
     val components = JobManager.createJobManagerComponents(
       config,
       executor,
-      executor)
+      executor,
+      highAvailabilityServices.createBlobStore())
 
     // Start the JobManager without a MetricRegistry so that we don't start the MetricQueryService.
     // The problem of the MetricQueryService is that it starts an actor with a fixed name. Thus,

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
index 09dc5ed..1db0a85 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
@@ -19,9 +19,9 @@
 package org.apache.flink.runtime.testingUtils
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices
 import org.apache.flink.runtime.io.disk.iomanager.IOManager
 import org.apache.flink.runtime.io.network.NetworkEnvironment
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
 import org.apache.flink.runtime.memory.MemoryManager
 import org.apache.flink.runtime.metrics.MetricRegistry
 import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration
@@ -32,15 +32,15 @@ import scala.language.postfixOps
 /** Subclass of the [[TaskManager]] to support testing messages
  */
 class TestingTaskManager(
-                          config: TaskManagerConfiguration,
-                          resourceID: ResourceID,
-                          connectionInfo: TaskManagerLocation,
-                          memoryManager: MemoryManager,
-                          ioManager: IOManager,
-                          network: NetworkEnvironment,
-                          numberOfSlots: Int,
-                          leaderRetrievalService: LeaderRetrievalService,
-                          metricRegistry : MetricRegistry)
+    config: TaskManagerConfiguration,
+    resourceID: ResourceID,
+    connectionInfo: TaskManagerLocation,
+    memoryManager: MemoryManager,
+    ioManager: IOManager,
+    network: NetworkEnvironment,
+    numberOfSlots: Int,
+    highAvailabilityServices: HighAvailabilityServices,
+    metricRegistry : MetricRegistry)
   extends TaskManager(
     config,
     resourceID,
@@ -49,19 +49,19 @@ class TestingTaskManager(
     ioManager,
     network,
     numberOfSlots,
-    leaderRetrievalService,
+    highAvailabilityServices,
     metricRegistry)
   with TestingTaskManagerLike {
 
   def this(
-            config: TaskManagerConfiguration,
-            connectionInfo: TaskManagerLocation,
-            memoryManager: MemoryManager,
-            ioManager: IOManager,
-            network: NetworkEnvironment,
-            numberOfSlots: Int,
-            leaderRetrievalService: LeaderRetrievalService,
-            metricRegistry : MetricRegistry) {
+    config: TaskManagerConfiguration,
+    connectionInfo: TaskManagerLocation,
+    memoryManager: MemoryManager,
+    ioManager: IOManager,
+    network: NetworkEnvironment,
+    numberOfSlots: Int,
+    highAvailabilityServices: HighAvailabilityServices,
+    metricRegistry : MetricRegistry) {
     this(
       config,
       ResourceID.generate(),
@@ -70,7 +70,7 @@ class TestingTaskManager(
       ioManager,
       network,
       numberOfSlots,
-      leaderRetrievalService,
+      highAvailabilityServices,
       metricRegistry)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
index 5f9d178..2983d66 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
@@ -155,6 +155,7 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
 		Configuration config = new Configuration();
 		config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
 		config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkQuorum);
+		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, FileStateBackendBasePath.getAbsolutePath());
 
 		ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
 				"leader", 1, config);

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
index 0f82faa..1df4b8d 100644
--- a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
+++ b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
@@ -19,6 +19,7 @@
 package org.apache.flink.yarn
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices
 import org.apache.flink.runtime.io.disk.iomanager.IOManager
 import org.apache.flink.runtime.io.network.NetworkEnvironment
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
@@ -40,19 +41,19 @@ import org.apache.flink.runtime.testingUtils.TestingTaskManagerLike
   * @param ioManager IOManager responsible for I/O
   * @param network NetworkEnvironment for this actor
   * @param numberOfSlots Number of slots for this TaskManager
-  * @param leaderRetrievalService [[LeaderRetrievalService]] to retrieve the current leading
-  *                              JobManager
+  * @param highAvailabilityServices [[HighAvailabilityServices]] to create a leader retrieval
+  *                                service for retrieving the leading JobManager
   */
 class TestingYarnTaskManager(
-                              config: TaskManagerConfiguration,
-                              resourceID: ResourceID,
-                              connectionInfo: TaskManagerLocation,
-                              memoryManager: MemoryManager,
-                              ioManager: IOManager,
-                              network: NetworkEnvironment,
-                              numberOfSlots: Int,
-                              leaderRetrievalService: LeaderRetrievalService,
-                              metricRegistry : MetricRegistry)
+    config: TaskManagerConfiguration,
+    resourceID: ResourceID,
+    connectionInfo: TaskManagerLocation,
+    memoryManager: MemoryManager,
+    ioManager: IOManager,
+    network: NetworkEnvironment,
+    numberOfSlots: Int,
+    highAvailabilityServices: HighAvailabilityServices,
+    metricRegistry : MetricRegistry)
   extends YarnTaskManager(
     config,
     resourceID,
@@ -61,7 +62,7 @@ class TestingYarnTaskManager(
     ioManager,
     network,
     numberOfSlots,
-    leaderRetrievalService,
+    highAvailabilityServices,
     metricRegistry)
   with TestingTaskManagerLike {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java
index e9c3904..f81d040 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.blob.BlobStore;
+import org.apache.flink.runtime.blob.BlobStoreService;
 import org.apache.flink.runtime.blob.FileSystemBlobStore;
 import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -91,6 +92,9 @@ public abstract class YarnHighAvailabilityServices implements HighAvailabilitySe
 	 * HA services clean up */
 	protected final Path haDataDirectory;
 
+	/** Blob store service to be used for the BlobServer and BlobCache */
+	protected final BlobStoreService blobStoreService;
+
 	/** Flag marking this instance as shut down */
 	private volatile boolean closed;
 
@@ -153,6 +157,8 @@ public abstract class YarnHighAvailabilityServices implements HighAvailabilitySe
 		}
 
 		LOG.info("Flink YARN application will store recovery data at {}", haDataDirectory);
+
+		blobStoreService = new FileSystemBlobStore(flinkFileSystem, haDataDirectory.toString());
 	}
 
 	// ------------------------------------------------------------------------
@@ -163,7 +169,7 @@ public abstract class YarnHighAvailabilityServices implements HighAvailabilitySe
 	public BlobStore createBlobStore() throws IOException {
 		enter();
 		try {
-			return new FileSystemBlobStore(flinkFileSystem, haDataDirectory.toString());
+			return blobStoreService;
 		} finally {
 			exit();
 		}
@@ -192,11 +198,23 @@ public abstract class YarnHighAvailabilityServices implements HighAvailabilitySe
 			}
 			closed = true;
 
+			Throwable exception = null;
+
+			try {
+				blobStoreService.close();
+			} catch (Throwable t) {
+				exception = t;
+			}
+
 			// we do not propagate exceptions here, but only log them
 			try {
 				hadoopFileSystem.close();
 			} catch (Throwable t) {
-				LOG.warn("Error closing Hadoop FileSystem", t);
+				exception = ExceptionUtils.firstOrSuppressed(t, exception);
+			}
+
+			if (exception != null) {
+				ExceptionUtils.rethrowException(exception, "Could not properly close the YarnHighAvailabilityServices.");
 			}
 		}
 		finally {
@@ -213,12 +231,18 @@ public abstract class YarnHighAvailabilityServices implements HighAvailabilitySe
 			// we remember exceptions only, then continue cleanup, and re-throw at the end
 			Throwable exception = null;
 
+			try {
+				blobStoreService.closeAndCleanupAllData();
+			} catch (Throwable t) {
+				exception = t;
+			}
+
 			// first, we delete all data in Flink's data directory
 			try {
 				flinkFileSystem.delete(haDataDirectory, true);
 			}
 			catch (Throwable t) {
-				exception = t;
+				exception = ExceptionUtils.firstOrSuppressed(t, exception);
 			}
 
 			// now we actually close the services

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
index be31085..b7f4c9a 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
@@ -19,9 +19,9 @@
 package org.apache.flink.yarn
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices
 import org.apache.flink.runtime.io.disk.iomanager.IOManager
 import org.apache.flink.runtime.io.network.NetworkEnvironment
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
 import org.apache.flink.runtime.memory.MemoryManager
 import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerLocation}
 import org.apache.flink.runtime.metrics.MetricRegistry
@@ -38,7 +38,7 @@ class YarnTaskManager(
     ioManager: IOManager,
     network: NetworkEnvironment,
     numberOfSlots: Int,
-    leaderRetrievalService: LeaderRetrievalService,
+    highAvailabilityServices: HighAvailabilityServices,
     metricRegistry : MetricRegistry)
   extends TaskManager(
     config,
@@ -48,7 +48,7 @@ class YarnTaskManager(
     ioManager,
     network,
     numberOfSlots,
-    leaderRetrievalService,
+    highAvailabilityServices,
     metricRegistry) {
 
   override def handleMessage: Receive = {