You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/05/17 06:17:59 UTC

[1/5] flink git commit: [FLINK-6555] [futures] Generalize ConjunctFuture to return results

Repository: flink
Updated Branches:
  refs/heads/release-1.3 4104409cc -> 60873b0c5


[FLINK-6555] [futures] Generalize ConjunctFuture to return results

The ConjunctFuture now returns the set of future values once it is completed.

Introduce WaitingConjunctFuture; Fix thread safety issue with ResultConjunctFuture

The WaitingConjunctFuture waits for the completion of its futures. The future values
are discarded making it more efficient than the ResultConjunctFuture which returns
the futures' values. The WaitingConjunctFuture is instantiated via
FutureUtils.waitForAll(Collection<Future>).

This closes #3873.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9c6c9654
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9c6c9654
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9c6c9654

Branch: refs/heads/release-1.3
Commit: 9c6c9654e11dd31fa2323977fc02961811d6e518
Parents: 4104409
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu May 11 17:36:17 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed May 17 08:16:50 2017 +0200

----------------------------------------------------------------------
 .../flink/runtime/concurrent/FutureUtils.java   | 131 +++++++++++++++----
 .../runtime/executiongraph/ExecutionGraph.java  |   8 +-
 .../executiongraph/ExecutionJobVertex.java      |   4 +-
 .../executiongraph/failover/FailoverRegion.java |   2 +-
 .../runtime/concurrent/FutureUtilsTest.java     |  83 ++++++++++--
 5 files changed, 184 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9c6c9654/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
index 4948147..a27af56 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
@@ -19,8 +19,11 @@
 package org.apache.flink.runtime.concurrent;
 
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.util.Preconditions;
 
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -106,8 +109,9 @@ public class FutureUtils {
 
 	/**
 	 * Creates a future that is complete once multiple other futures completed. 
-	 * The ConjunctFuture fails (completes exceptionally) once one of the Futures in the
-	 * conjunction fails.
+	 * The future fails (completes exceptionally) once one of the futures in the
+	 * conjunction fails. Upon successful completion, the future returns the
+	 * collection of the futures' results.
 	 *
 	 * <p>The ConjunctFuture gives access to how many Futures in the conjunction have already
 	 * completed successfully, via {@link ConjunctFuture#getNumFuturesCompleted()}. 
@@ -115,16 +119,16 @@ public class FutureUtils {
 	 * @param futures The futures that make up the conjunction. No null entries are allowed.
 	 * @return The ConjunctFuture that completes once all given futures are complete (or one fails).
 	 */
-	public static ConjunctFuture combineAll(Collection<? extends Future<?>> futures) {
+	public static <T> ConjunctFuture<Collection<T>> combineAll(Collection<? extends Future<? extends T>> futures) {
 		checkNotNull(futures, "futures");
 
-		final ConjunctFutureImpl conjunct = new ConjunctFutureImpl(futures.size());
+		final ResultConjunctFuture<T> conjunct = new ResultConjunctFuture<>(futures.size());
 
 		if (futures.isEmpty()) {
-			conjunct.complete(null);
+			conjunct.complete(Collections.<T>emptyList());
 		}
 		else {
-			for (Future<?> future : futures) {
+			for (Future<? extends T> future : futures) {
 				future.handle(conjunct.completionHandler);
 			}
 		}
@@ -133,16 +137,32 @@ public class FutureUtils {
 	}
 
 	/**
+	 * Creates a future that is complete once all of the given futures have completed.
+	 * The future fails (completes exceptionally) once one of the given futures
+	 * fails.
+	 *
+	 * <p>The ConjunctFuture gives access to how many Futures have already
+	 * completed successfully, via {@link ConjunctFuture#getNumFuturesCompleted()}.
+	 *
+	 * @param futures The futures to wait on. No null entries are allowed.
+	 * @return The WaitingFuture that completes once all given futures are complete (or one fails).
+	 */
+	public static ConjunctFuture<Void> waitForAll(Collection<? extends Future<?>> futures) {
+		checkNotNull(futures, "futures");
+
+		return new WaitingConjunctFuture(futures);
+	}
+
+	/**
 	 * A future that is complete once multiple other futures completed. The futures are not
-	 * necessarily of the same type, which is why the type of this Future is {@code Void}.
-	 * The ConjunctFuture fails (completes exceptionally) once one of the Futures in the
-	 * conjunction fails.
+	 * necessarily of the same type. The ConjunctFuture fails (completes exceptionally) once
+	 * one of the Futures in the conjunction fails.
 	 * 
 	 * <p>The advantage of using the ConjunctFuture over chaining all the futures (such as via
 	 * {@link Future#thenCombine(Future, BiFunction)}) is that ConjunctFuture also tracks how
 	 * many of the Futures are already complete.
 	 */
-	public interface ConjunctFuture extends CompletableFuture<Void> {
+	public interface ConjunctFuture<T> extends CompletableFuture<T> {
 
 		/**
 		 * Gets the total number of Futures in the conjunction.
@@ -158,39 +178,102 @@ public class FutureUtils {
 	}
 
 	/**
-	 * The implementation of the {@link ConjunctFuture}.
-	 * 
-	 * <p>Implementation notice: The member fields all have package-private access, because they are
-	 * either accessed by an inner subclass or by the enclosing class.
+	 * The implementation of the {@link ConjunctFuture} which returns its Futures' result as a collection.
 	 */
-	private static class ConjunctFutureImpl extends FlinkCompletableFuture<Void> implements ConjunctFuture {
+	private static class ResultConjunctFuture<T> extends FlinkCompletableFuture<Collection<T>> implements ConjunctFuture<Collection<T>> {
 
 		/** The total number of futures in the conjunction */
-		final int numTotal;
+		private final int numTotal;
+
+		/** The next free index in the results arrays */
+		private final AtomicInteger nextIndex = new AtomicInteger(0);
 
 		/** The number of futures in the conjunction that are already complete */
-		final AtomicInteger numCompleted = new AtomicInteger();
+		private final AtomicInteger numCompleted = new AtomicInteger(0);
+
+		/** The set of collected results so far */
+		private volatile T[] results;
 
 		/** The function that is attached to all futures in the conjunction. Once a future
-		 * is complete, this function tracks the completion or fails the conjunct.  
+		 * is complete, this function tracks the completion or fails the conjunct.
 		 */
-		final BiFunction<Object, Throwable, Void> completionHandler = new BiFunction<Object, Throwable, Void>() {
+		final BiFunction<T, Throwable, Void> completionHandler = new BiFunction<T, Throwable, Void>() {
 
 			@Override
-			public Void apply(Object o, Throwable throwable) {
+			public Void apply(T o, Throwable throwable) {
 				if (throwable != null) {
 					completeExceptionally(throwable);
-				}
-				else if (numTotal == numCompleted.incrementAndGet()) {
-					complete(null);
+				} else {
+					int index = nextIndex.getAndIncrement();
+
+					results[index] = o;
+
+					if (numCompleted.incrementAndGet() == numTotal) {
+						complete(Arrays.asList(results));
+					}
 				}
 
 				return null;
 			}
 		};
 
-		ConjunctFutureImpl(int numTotal) {
+		@SuppressWarnings("unchecked")
+		ResultConjunctFuture(int numTotal) {
 			this.numTotal = numTotal;
+			results = (T[])new Object[numTotal];
+		}
+
+		@Override
+		public int getNumFuturesTotal() {
+			return numTotal;
+		}
+
+		@Override
+		public int getNumFuturesCompleted() {
+			return numCompleted.get();
+		}
+	}
+
+	/**
+	 * Implementation of the {@link ConjunctFuture} interface which waits only for the completion
+	 * of its futures and does not return their values.
+	 */
+	private static final class WaitingConjunctFuture extends FlinkCompletableFuture<Void> implements ConjunctFuture<Void> {
+
+		/** Number of completed futures */
+		private final AtomicInteger numCompleted = new AtomicInteger(0);
+
+		/** Total number of futures to wait on */
+		private final int numTotal;
+
+		/** Handler which increments the atomic completion counter and completes or fails the WaitingFutureImpl */
+		private final BiFunction<Object, Throwable, Void> completionHandler = new BiFunction<Object, Throwable, Void>() {
+			@Override
+			public Void apply(Object o, Throwable throwable) {
+				if (throwable == null) {
+					if (numTotal == numCompleted.incrementAndGet()) {
+						complete(null);
+					}
+				} else {
+					completeExceptionally(throwable);
+				}
+
+				return null;
+			}
+		};
+
+		private WaitingConjunctFuture(Collection<? extends Future<?>> futures) {
+			Preconditions.checkNotNull(futures, "Futures must not be null.");
+
+			this.numTotal = futures.size();
+
+			if (futures.isEmpty()) {
+				complete(null);
+			} else {
+				for (Future<?> future : futures) {
+					future.handle(completionHandler);
+				}
+			}
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/9c6c9654/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 5eaa637..7c13936 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -871,7 +871,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 
 			// this future is complete once all slot futures are complete.
 			// the future fails once one slot future fails.
-			final ConjunctFuture allAllocationsComplete = FutureUtils.combineAll(slotFutures);
+			final ConjunctFuture<Void> allAllocationsComplete = FutureUtils.waitForAll(slotFutures);
 
 			// make sure that we fail if the allocation timeout was exceeded
 			final ScheduledFuture<?> timeoutCancelHandle = futureExecutor.schedule(new Runnable() {
@@ -892,7 +892,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 			allAllocationsComplete.handleAsync(new BiFunction<Void, Throwable, Void>() {
 
 				@Override
-				public Void apply(Void ignored, Throwable throwable) {
+				public Void apply(Void slots, Throwable throwable) {
 					try {
 						// we do not need the cancellation timeout any more
 						timeoutCancelHandle.cancel(false);
@@ -973,7 +973,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 					}
 
 					// we build a future that is complete once all vertices have reached a terminal state
-					final ConjunctFuture allTerminal = FutureUtils.combineAll(futures);
+					final ConjunctFuture<Void> allTerminal = FutureUtils.waitForAll(futures);
 					allTerminal.thenAccept(new AcceptFunction<Void>() {
 						@Override
 						public void accept(Void value) {
@@ -1102,7 +1102,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 					futures.add(ejv.cancelWithFuture());
 				}
 
-				final ConjunctFuture allTerminal = FutureUtils.combineAll(futures);
+				final ConjunctFuture<Void> allTerminal = FutureUtils.waitForAll(futures);
 				allTerminal.thenAccept(new AcceptFunction<Void>() {
 					@Override
 					public void accept(Void value) {

http://git-wip-us.apache.org/repos/asf/flink/blob/9c6c9654/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 3a98e0a..f5a592a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -509,7 +509,7 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 	 */
 	public Future<Void> cancelWithFuture() {
 		// we collect all futures from the task cancellations
-		ArrayList<Future<?>> futures = new ArrayList<>(parallelism);
+		ArrayList<Future<ExecutionState>> futures = new ArrayList<>(parallelism);
 
 		// cancel each vertex
 		for (ExecutionVertex ev : getTaskVertices()) {
@@ -517,7 +517,7 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 		}
 
 		// return a conjunct future, which is complete once all individual tasks are canceled
-		return FutureUtils.combineAll(futures);
+		return FutureUtils.waitForAll(futures);
 	}
 
 	public void fail(Throwable t) {

http://git-wip-us.apache.org/repos/asf/flink/blob/9c6c9654/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
index b36cfcf..6066c77 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
@@ -150,7 +150,7 @@ public class FailoverRegion {
 						futures.add(vertex.cancel());
 					}
 
-					final FutureUtils.ConjunctFuture allTerminal = FutureUtils.combineAll(futures);
+					final FutureUtils.ConjunctFuture<Void> allTerminal = FutureUtils.waitForAll(futures);
 					allTerminal.thenAcceptAsync(new AcceptFunction<Void>() {
 						@Override
 						public void accept(Void value) {

http://git-wip-us.apache.org/repos/asf/flink/blob/9c6c9654/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
index 43710cb..e262459 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
@@ -21,10 +21,15 @@ package org.apache.flink.runtime.concurrent;
 import org.apache.flink.runtime.concurrent.FutureUtils.ConjunctFuture;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 
+import org.apache.flink.util.TestLogger;
+import org.hamcrest.collection.IsIterableContainingInAnyOrder;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.concurrent.ExecutionException;
 
@@ -33,17 +38,26 @@ import static org.junit.Assert.*;
 /**
  * Tests for the utility methods in {@link FutureUtils}
  */
-public class FutureUtilsTest {
+@RunWith(Parameterized.class)
+public class FutureUtilsTest extends TestLogger{
+
+	@Parameterized.Parameters
+	public static Collection<FutureFactory> parameters (){
+		return Arrays.asList(new ConjunctFutureFactory(), new WaitingFutureFactory());
+	}
+
+	@Parameterized.Parameter
+	public FutureFactory futureFactory;
 
 	@Test
 	public void testConjunctFutureFailsOnEmptyAndNull() throws Exception {
 		try {
-			FutureUtils.combineAll(null);
+			futureFactory.createFuture(null);
 			fail();
 		} catch (NullPointerException ignored) {}
 
 		try {
-			FutureUtils.combineAll(Arrays.asList(
+			futureFactory.createFuture(Arrays.asList(
 					new FlinkCompletableFuture<Object>(),
 					null,
 					new FlinkCompletableFuture<Object>()));
@@ -63,11 +77,11 @@ public class FutureUtilsTest {
 		future2.complete(new Object());
 
 		// build the conjunct future
-		ConjunctFuture result = FutureUtils.combineAll(Arrays.asList(future1, future2, future3, future4));
+		ConjunctFuture<?> result = futureFactory.createFuture(Arrays.asList(future1, future2, future3, future4));
 
-		Future<Void> resultMapped = result.thenAccept(new AcceptFunction<Void>() {
+		Future<?> resultMapped = result.thenAccept(new AcceptFunction<Object>() {
 			@Override
-			public void accept(Void value) {}
+			public void accept(Object value) {}
 		});
 
 		assertEquals(4, result.getNumFuturesTotal());
@@ -108,11 +122,11 @@ public class FutureUtilsTest {
 		CompletableFuture<Object> future4 = new FlinkCompletableFuture<>();
 
 		// build the conjunct future
-		ConjunctFuture result = FutureUtils.combineAll(Arrays.asList(future1, future2, future3, future4));
+		ConjunctFuture<?> result = futureFactory.createFuture(Arrays.asList(future1, future2, future3, future4));
 
-		Future<Void> resultMapped = result.thenAccept(new AcceptFunction<Void>() {
+		Future<?> resultMapped = result.thenAccept(new AcceptFunction<Object>() {
 			@Override
-			public void accept(Void value) {}
+			public void accept(Object value) {}
 		});
 
 		assertEquals(4, result.getNumFuturesTotal());
@@ -150,12 +164,12 @@ public class FutureUtilsTest {
 		CompletableFuture<Object> future4 = new FlinkCompletableFuture<>();
 
 		// build the conjunct future
-		ConjunctFuture result = FutureUtils.combineAll(Arrays.asList(future1, future2, future3, future4));
+		ConjunctFuture<?> result = futureFactory.createFuture(Arrays.asList(future1, future2, future3, future4));
 		assertEquals(4, result.getNumFuturesTotal());
 
-		Future<Void> resultMapped = result.thenAccept(new AcceptFunction<Void>() {
+		Future<?> resultMapped = result.thenAccept(new AcceptFunction<Object>() {
 			@Override
-			public void accept(Void value) {}
+			public void accept(Object value) {}
 		});
 
 		future1.complete(new Object());
@@ -183,12 +197,55 @@ public class FutureUtilsTest {
 		}
 	}
 
+	/**
+	 * Tests that the conjunct future returns upon completion the collection of all future values
+	 */
+	@Test
+	public void testConjunctFutureValue() throws ExecutionException, InterruptedException {
+		CompletableFuture<Integer> future1 = FlinkCompletableFuture.completed(1);
+		CompletableFuture<Long> future2 = FlinkCompletableFuture.completed(2L);
+		CompletableFuture<Double> future3 = new FlinkCompletableFuture<>();
+
+		ConjunctFuture<Collection<Number>> result = FutureUtils.<Number>combineAll(Arrays.asList(future1, future2, future3));
+
+		assertFalse(result.isDone());
+
+		future3.complete(.1);
+
+		assertTrue(result.isDone());
+
+		assertThat(result.get(), IsIterableContainingInAnyOrder.<Number>containsInAnyOrder(1, 2L, .1));
+	}
+
 	@Test
 	public void testConjunctOfNone() throws Exception {
-		final ConjunctFuture result = FutureUtils.combineAll(Collections.<Future<Object>>emptyList());
+		final ConjunctFuture<?> result = futureFactory.createFuture(Collections.<Future<Object>>emptyList());
 
 		assertEquals(0, result.getNumFuturesTotal());
 		assertEquals(0, result.getNumFuturesCompleted());
 		assertTrue(result.isDone());
 	}
+
+	/**
+	 * Factory to create {@link ConjunctFuture} for testing.
+	 */
+	private interface FutureFactory {
+		ConjunctFuture<?> createFuture(Collection<? extends Future<?>> futures);
+	}
+
+	private static class ConjunctFutureFactory implements FutureFactory {
+
+		@Override
+		public ConjunctFuture<?> createFuture(Collection<? extends Future<?>> futures) {
+			return FutureUtils.combineAll(futures);
+		}
+	}
+
+	private static class WaitingFutureFactory implements FutureFactory {
+
+		@Override
+		public ConjunctFuture<?> createFuture(Collection<? extends Future<?>> futures) {
+			return FutureUtils.waitForAll(futures);
+		}
+	}
 }


[2/5] flink git commit: [FLINK-6284] Correct sorting of completed checkpoints in ZooKeeperStateHandleStore

Posted by tr...@apache.org.
[FLINK-6284] Correct sorting of completed checkpoints in ZooKeeperStateHandleStore

In order to store completed checkpoints in an increasing order in ZooKeeper,
the paths for the completed checkpoint is no generated by
String.format("/%019d", checkpointId) instead of String.format("/%s", checkpointId).
This makes sure that the converted long will always have the same length with
leading 0s.

Fix failing ZooKeeperCompletedCheckpointStoreITCase

This closes #3884.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/827d74e6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/827d74e6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/827d74e6

Branch: refs/heads/release-1.3
Commit: 827d74e69386cff87576972c1b69a16b92b730ae
Parents: 9c6c965
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri May 12 14:23:37 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed May 17 08:16:54 2017 +0200

----------------------------------------------------------------------
 .../ZooKeeperCompletedCheckpointStore.java      |  6 ++--
 ...ZooKeeperCompletedCheckpointStoreITCase.java | 34 +++++++++++++++++---
 2 files changed, 33 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/827d74e6/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index c8c68bc..95cfb0f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -425,8 +425,8 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
 	 * @param checkpointId to convert to the path
 	 * @return Path created from the given checkpoint id
 	 */
-	protected static String checkpointIdToPath(long checkpointId) {
-		return String.format("/%s", checkpointId);
+	public static String checkpointIdToPath(long checkpointId) {
+		return String.format("/%019d", checkpointId);
 	}
 
 	/**
@@ -435,7 +435,7 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
 	 * @param path in ZooKeeper
 	 * @return Checkpoint id parsed from the path
 	 */
-	protected static long pathToCheckpointId(String path) {
+	public static long pathToCheckpointId(String path) {
 		try {
 			String numberString;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/827d74e6/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
index 73e0ed9..3fd7f1b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
@@ -137,11 +137,11 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
 
 		store.addCheckpoint(checkpoint);
 		assertEquals(1, store.getNumberOfRetainedCheckpoints());
-		assertNotNull(client.checkExists().forPath(CheckpointsPath + "/" + checkpoint.getCheckpointID()));
+		assertNotNull(client.checkExists().forPath(CheckpointsPath + ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpoint.getCheckpointID())));
 
 		store.shutdown(JobStatus.FINISHED);
 		assertEquals(0, store.getNumberOfRetainedCheckpoints());
-		assertNull(client.checkExists().forPath(CheckpointsPath + "/" + checkpoint.getCheckpointID()));
+		assertNull(client.checkExists().forPath(CheckpointsPath + ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpoint.getCheckpointID())));
 
 		store.recover();
 
@@ -161,12 +161,12 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
 
 		store.addCheckpoint(checkpoint);
 		assertEquals(1, store.getNumberOfRetainedCheckpoints());
-		assertNotNull(client.checkExists().forPath(CheckpointsPath + "/" + checkpoint.getCheckpointID()));
+		assertNotNull(client.checkExists().forPath(CheckpointsPath + ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpoint.getCheckpointID())));
 
 		store.shutdown(JobStatus.SUSPENDED);
 
 		assertEquals(0, store.getNumberOfRetainedCheckpoints());
-		assertNotNull(client.checkExists().forPath(CheckpointsPath + "/" + checkpoint.getCheckpointID()));
+		assertNotNull(client.checkExists().forPath(CheckpointsPath + ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpoint.getCheckpointID())));
 
 		// Recover again
 		store.recover();
@@ -175,6 +175,32 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
 		assertEquals(checkpoint, recovered);
 	}
 
+	/**
+	 * FLINK-6284
+	 *
+	 * Tests that the latest recovered checkpoint is the one with the highest checkpoint id
+	 */
+	@Test
+	public void testLatestCheckpointRecovery() throws Exception {
+		final int numCheckpoints = 3;
+		AbstractCompletedCheckpointStore checkpointStore = createCompletedCheckpoints(numCheckpoints);
+		List<CompletedCheckpoint> checkpoints = new ArrayList<>(numCheckpoints);
+
+		checkpoints.add(createCheckpoint(9));
+		checkpoints.add(createCheckpoint(10));
+		checkpoints.add(createCheckpoint(11));
+
+		for (CompletedCheckpoint checkpoint : checkpoints) {
+			checkpointStore.addCheckpoint(checkpoint);
+		}
+
+		checkpointStore.recover();
+
+		CompletedCheckpoint latestCheckpoint = checkpointStore.getLatestCheckpoint();
+
+		assertEquals(checkpoints.get(checkpoints.size() -1), latestCheckpoint);
+	}
+
 	static class HeapRetrievableStateHandle<T extends Serializable> implements RetrievableStateHandle<T> {
 
 		private static final long serialVersionUID = -268548467968932L;


[4/5] flink git commit: [FLINK-6519] Integrate BlobStore in lifecycle management of HighAvailabilityServices

Posted by tr...@apache.org.
[FLINK-6519] Integrate BlobStore in lifecycle management of HighAvailabilityServices

The HighAvailabilityService creates a single BlobStoreService instance which is
shared by all BlobServer and BlobCache instances. The BlobStoreService's lifecycle
is exclusively managed by the HighAvailabilityServices. This means that the
BlobStore's content is only cleaned up if the HighAvailabilityService's HA data
is cleaned up. Having this single point of control, makes it easier to decide when
to discard HA data (e.g. in case of a successful job execution) and when to retain
the data (e.g. for recovery).

Close and cleanup all data of BlobStore in HighAvailabilityServices

Use HighAvailabilityServices to create BlobStore

Introduce BlobStoreService interface to hide close and closeAndCleanupAllData methods

This closes #3864.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e3ea89a9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e3ea89a9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e3ea89a9

Branch: refs/heads/release-1.3
Commit: e3ea89a9fab39e7595c466bcd90c30c338c86f4e
Parents: 827d74e
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue May 9 10:26:37 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed May 17 08:16:59 2017 +0200

----------------------------------------------------------------------
 .../org/apache/flink/hdfstests/HDFSTest.java    | 14 ++-
 .../clusterframework/MesosTaskManager.scala     |  6 +-
 .../runtime/webmonitor/WebRuntimeMonitor.java   | 26 +++++-
 .../handlers/TaskManagerLogHandler.java         | 11 ++-
 .../webmonitor/WebRuntimeMonitorITCase.java     | 14 ++-
 .../handlers/TaskManagerLogHandlerTest.java     | 11 ++-
 .../apache/flink/runtime/blob/BlobCache.java    | 80 ++++-------------
 .../apache/flink/runtime/blob/BlobServer.java   | 38 ++++----
 .../apache/flink/runtime/blob/BlobService.java  |  8 +-
 .../apache/flink/runtime/blob/BlobStore.java    | 26 +-----
 .../flink/runtime/blob/BlobStoreService.java    | 32 +++++++
 .../apache/flink/runtime/blob/BlobUtils.java    | 44 +++++++--
 .../org/apache/flink/runtime/blob/BlobView.java | 49 +++++++++++
 .../flink/runtime/blob/FileSystemBlobStore.java | 11 ++-
 .../flink/runtime/blob/VoidBlobStore.java       |  8 +-
 .../apache/flink/runtime/client/JobClient.java  | 13 ++-
 .../runtime/client/JobListeningContext.java     |  6 +-
 .../clusterframework/BootstrapTools.java        |  8 +-
 .../librarycache/BlobLibraryCacheManager.java   |  2 +-
 .../HighAvailabilityServicesUtils.java          | 12 ++-
 .../nonha/AbstractNonHaServices.java            |  5 +-
 .../zookeeper/ZooKeeperHaServices.java          | 93 ++++++++++----------
 .../runtime/jobmaster/JobManagerServices.java   |  2 +-
 .../runtime/taskexecutor/TaskExecutor.java      |  5 +-
 .../runtime/webmonitor/WebMonitorUtils.java     | 21 +++--
 .../flink/runtime/jobmanager/JobManager.scala   | 21 ++---
 .../runtime/minicluster/FlinkMiniCluster.scala  |  8 +-
 .../minicluster/LocalFlinkMiniCluster.scala     |  8 +-
 .../flink/runtime/taskmanager/TaskManager.scala | 36 +++++---
 .../runtime/blob/BlobCacheRetriesTest.java      | 79 ++++++++---------
 .../runtime/blob/BlobCacheSuccessTest.java      | 56 ++++++------
 .../flink/runtime/blob/BlobClientSslTest.java   | 52 +++++------
 .../flink/runtime/blob/BlobClientTest.java      | 16 ++--
 .../flink/runtime/blob/BlobRecoveryITCase.java  | 21 +++--
 .../runtime/blob/BlobServerDeleteTest.java      | 21 +++--
 .../flink/runtime/blob/BlobServerGetTest.java   | 41 +++------
 .../flink/runtime/blob/BlobServerPutTest.java   | 89 +++++--------------
 .../flink/runtime/blob/BlobServerRangeTest.java | 10 +--
 .../runtime/blob/TestingFailingBlobServer.java  |  4 +-
 .../BlobLibraryCacheManagerTest.java            | 33 +++----
 .../BlobLibraryCacheRecoveryITCase.java         | 21 +++--
 .../zookeeper/ZooKeeperRegistryTest.java        |  7 +-
 .../jobmanager/JobManagerHARecoveryTest.java    |  9 +-
 .../JobManagerLeaderElectionTest.java           |  3 +-
 .../ZooKeeperLeaderRetrievalTest.java           | 13 ++-
 .../runtime/metrics/TaskManagerMetricsTest.java |  2 +-
 ...askManagerComponentsStartupShutdownTest.java |  5 +-
 .../TaskManagerRegistrationTest.java            |  5 +-
 .../src/test/resources/log4j-test.properties    |  2 +-
 .../jobmanager/JobManagerRegistrationTest.scala |  3 +-
 .../testingUtils/TestingTaskManager.scala       | 40 ++++-----
 ...agerHAProcessFailureBatchRecoveryITCase.java |  1 +
 .../flink/yarn/TestingYarnTaskManager.scala     | 25 +++---
 .../YarnHighAvailabilityServices.java           | 30 ++++++-
 .../org/apache/flink/yarn/YarnTaskManager.scala |  6 +-
 55 files changed, 667 insertions(+), 545 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
index 8a3f662..0815863 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
@@ -31,6 +31,8 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.examples.java.wordcount.WordCount;
 import org.apache.flink.runtime.blob.BlobRecoveryITCase;
+import org.apache.flink.runtime.blob.BlobStoreService;
+import org.apache.flink.runtime.blob.BlobUtils;
 import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.util.FileUtils;
@@ -234,7 +236,17 @@ public class HDFSTest {
 		config.setString(CoreOptions.STATE_BACKEND, "ZOOKEEPER");
 		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI);
 
-		BlobRecoveryITCase.testBlobServerRecovery(config);
+		BlobStoreService blobStoreService = null;
+
+		try {
+			blobStoreService = BlobUtils.createBlobStoreFromConfig(config);
+
+			BlobRecoveryITCase.testBlobServerRecovery(config, blobStoreService);
+		} finally {
+			if (blobStoreService != null) {
+				blobStoreService.closeAndCleanupAllData();
+			}
+		}
 	}
 
 	// package visible

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala b/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala
index e8d6a58..7834639 100644
--- a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala
+++ b/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala
@@ -19,9 +19,9 @@
 package org.apache.flink.mesos.runtime.clusterframework
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices
 import org.apache.flink.runtime.io.disk.iomanager.IOManager
 import org.apache.flink.runtime.io.network.NetworkEnvironment
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
 import org.apache.flink.runtime.memory.MemoryManager
 import org.apache.flink.runtime.metrics.MetricRegistry
 import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration
@@ -38,7 +38,7 @@ class MesosTaskManager(
     ioManager: IOManager,
     network: NetworkEnvironment,
     numberOfSlots: Int,
-    leaderRetrievalService: LeaderRetrievalService,
+    highAvailabilityServices: HighAvailabilityServices,
     metricRegistry : MetricRegistry)
   extends TaskManager(
     config,
@@ -48,7 +48,7 @@ class MesosTaskManager(
     ioManager,
     network,
     numberOfSlots,
-    leaderRetrievalService,
+    highAvailabilityServices,
     metricRegistry) {
 
   override def handleMessage: Receive = {

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index f83fa27..5c66545 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.blob.BlobView;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.net.SSLUtils;
@@ -147,6 +148,7 @@ public class WebRuntimeMonitor implements WebMonitor {
 	public WebRuntimeMonitor(
 			Configuration config,
 			LeaderRetrievalService leaderRetrievalService,
+			BlobView blobView,
 			ActorSystem actorSystem) throws IOException, InterruptedException {
 
 		this.leaderRetrievalService = checkNotNull(leaderRetrievalService);
@@ -279,10 +281,26 @@ public class WebRuntimeMonitor implements WebMonitor {
 		GET(router, new JobMetricsHandler(metricFetcher));
 
 		GET(router, new TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT, metricFetcher));
-		GET(router, new TaskManagerLogHandler(retriever, context, jobManagerAddressPromise.future(), timeout,
-				TaskManagerLogHandler.FileMode.LOG, config, enableSSL));
-		GET(router, new TaskManagerLogHandler(retriever, context, jobManagerAddressPromise.future(), timeout,
-				TaskManagerLogHandler.FileMode.STDOUT, config, enableSSL));
+		GET(router,
+			new TaskManagerLogHandler(
+				retriever,
+				context,
+				jobManagerAddressPromise.future(),
+				timeout,
+				TaskManagerLogHandler.FileMode.LOG,
+				config,
+				enableSSL,
+				blobView));
+		GET(router,
+			new TaskManagerLogHandler(
+				retriever,
+				context,
+				jobManagerAddressPromise.future(),
+				timeout,
+				TaskManagerLogHandler.FileMode.STDOUT,
+				config,
+				enableSSL,
+				blobView));
 		GET(router, new TaskManagerMetricsHandler(metricFetcher));
 
 		router

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
index 37ee814..53ee336 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
@@ -50,6 +50,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.blob.BlobView;
 import org.apache.flink.runtime.concurrent.AcceptFunction;
 import org.apache.flink.runtime.concurrent.ApplyFunction;
 import org.apache.flink.runtime.concurrent.BiFunction;
@@ -62,6 +63,7 @@ import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
 import org.apache.flink.runtime.webmonitor.RuntimeMonitorHandlerBase;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -116,6 +118,8 @@ public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase {
 
 	private final Time timeTimeout;
 
+	private final BlobView blobView;
+
 	public enum FileMode {
 		LOG,
 		STDOUT
@@ -128,7 +132,8 @@ public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase {
 		FiniteDuration timeout,
 		FileMode fileMode,
 		Configuration config,
-		boolean httpsEnabled) {
+		boolean httpsEnabled,
+		BlobView blobView) {
 		super(retriever, localJobManagerAddressPromise, timeout, httpsEnabled);
 
 		this.executor = checkNotNull(executor);
@@ -142,6 +147,8 @@ public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase {
 				break;
 		}
 
+		this.blobView = Preconditions.checkNotNull(blobView, "blobView");
+
 		timeTimeout = Time.milliseconds(timeout.toMillis());
 	}
 
@@ -167,7 +174,7 @@ public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase {
 					Option<String> hostOption = jobManager.actor().path().address().host();
 					String host = hostOption.isDefined() ? hostOption.get() : "localhost";
 					int port = (int) result;
-					return new BlobCache(new InetSocketAddress(host, port), config);
+					return new BlobCache(new InetSocketAddress(host, port), config, blobView);
 				}
 			}, executor);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
index 5ccfe90..b418141 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.blob.BlobView;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.jobmanager.JobManager;
@@ -153,6 +154,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 				webMonitor[i] = new WebRuntimeMonitor(
 					config,
 					highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
+					highAvailabilityServices.createBlobStore(),
 					jobManagerSystem[i]);
 			}
 
@@ -293,9 +295,11 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 
 			actorSystem = AkkaUtils.createDefaultActorSystem();
 
-			LeaderRetrievalService leaderRetrievalService = mock(LeaderRetrievalService.class);
 			webRuntimeMonitor = new WebRuntimeMonitor(
-					config, leaderRetrievalService, actorSystem);
+				config,
+				mock(LeaderRetrievalService.class),
+				mock(BlobView.class),
+				actorSystem);
 
 			webRuntimeMonitor.start("akka://schmakka");
 
@@ -466,10 +470,12 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 		config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
 		config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.toString());
 
+		HighAvailabilityServices highAvailabilityServices = flink.highAvailabilityServices();
+
 		WebRuntimeMonitor webMonitor = new WebRuntimeMonitor(
 			config,
-			flink.highAvailabilityServices().getJobManagerLeaderRetriever(
-				HighAvailabilityServices.DEFAULT_JOB_ID),
+			highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
+			highAvailabilityServices.createBlobStore(),
 			jmActorSystem);
 
 		webMonitor.start(jobManagerAddress);

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java
index 4177f44..3d8f1a3 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.CompletableFuture;
 import org.apache.flink.runtime.concurrent.Executors;
@@ -53,7 +54,6 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.mockito.Matchers.any;
@@ -71,7 +71,8 @@ public class TaskManagerLogHandlerTest {
 			AkkaUtils.getDefaultClientTimeout(),
 			TaskManagerLogHandler.FileMode.LOG,
 			new Configuration(),
-			false);
+			false,
+			new VoidBlobStore());
 		String[] pathsLog = handlerLog.getPaths();
 		Assert.assertEquals(1, pathsLog.length);
 		Assert.assertEquals("/taskmanagers/:taskmanagerid/log", pathsLog[0]);
@@ -83,7 +84,8 @@ public class TaskManagerLogHandlerTest {
 			AkkaUtils.getDefaultClientTimeout(),
 			TaskManagerLogHandler.FileMode.STDOUT,
 			new Configuration(),
-			false);
+			false,
+			new VoidBlobStore());
 		String[] pathsOut = handlerOut.getPaths();
 		Assert.assertEquals(1, pathsOut.length);
 		Assert.assertEquals("/taskmanagers/:taskmanagerid/stdout", pathsOut[0]);
@@ -131,7 +133,8 @@ public class TaskManagerLogHandlerTest {
 			AkkaUtils.getDefaultClientTimeout(),
 			TaskManagerLogHandler.FileMode.LOG,
 			new Configuration(),
-			false);
+			false,
+			new VoidBlobStore());
 
 		final AtomicReference<String> exception = new AtomicReference<>();
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
index 2587b15..2eb103a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.blob;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.IOUtils;
 import org.slf4j.Logger;
@@ -58,7 +57,7 @@ public final class BlobCache implements BlobService {
 	private final File storageDir;
 
 	/** Blob store for distributed file storage, e.g. in HA */
-	private final BlobStore blobStore;
+	private final BlobView blobView;
 
 	private final AtomicBoolean shutdownRequested = new AtomicBoolean();
 
@@ -78,55 +77,19 @@ public final class BlobCache implements BlobService {
 	 * 		address of the {@link BlobServer} to use for fetching files from
 	 * @param blobClientConfig
 	 * 		global configuration
-	 *
-	 * @throws IOException
-	 * 		thrown if the (local or distributed) file storage cannot be created or
-	 * 		is not usable
-	 */
-	public BlobCache(InetSocketAddress serverAddress,
-			Configuration blobClientConfig) throws IOException {
-		this(serverAddress, blobClientConfig,
-			BlobUtils.createBlobStoreFromConfig(blobClientConfig));
-	}
-
-	/**
-	 * Instantiates a new BLOB cache.
-	 *
-	 * @param serverAddress
-	 * 		address of the {@link BlobServer} to use for fetching files from
-	 * @param blobClientConfig
-	 * 		global configuration
-	 * 	@param haServices
-	 * 		high availability services able to create a distributed blob store
-	 *
-	 * @throws IOException
-	 * 		thrown if the (local or distributed) file storage cannot be created or
-	 * 		is not usable
-	 */
-	public BlobCache(InetSocketAddress serverAddress,
-		Configuration blobClientConfig, HighAvailabilityServices haServices) throws IOException {
-		this(serverAddress, blobClientConfig, haServices.createBlobStore());
-	}
-
-	/**
-	 * Instantiates a new BLOB cache.
-	 *
-	 * @param serverAddress
-	 * 		address of the {@link BlobServer} to use for fetching files from
-	 * @param blobClientConfig
-	 * 		global configuration
-	 * @param blobStore
+	 * @param blobView
 	 * 		(distributed) blob store file system to retrieve files from first
 	 *
 	 * @throws IOException
 	 * 		thrown if the (local or distributed) file storage cannot be created or is not usable
 	 */
-	private BlobCache(
-			final InetSocketAddress serverAddress, final Configuration blobClientConfig,
-			final BlobStore blobStore) throws IOException {
+	public BlobCache(
+			final InetSocketAddress serverAddress,
+			final Configuration blobClientConfig,
+			final BlobView blobView) throws IOException {
 		this.serverAddress = checkNotNull(serverAddress);
 		this.blobClientConfig = checkNotNull(blobClientConfig);
-		this.blobStore = blobStore;
+		this.blobView = checkNotNull(blobView, "blobStore");
 
 		// configure and create the storage directory
 		String storageDirectory = blobClientConfig.getString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, null);
@@ -169,7 +132,7 @@ public final class BlobCache implements BlobService {
 
 		// first try the distributed blob store (if available)
 		try {
-			blobStore.get(requiredBlob, localJarFile);
+			blobView.get(requiredBlob, localJarFile);
 		} catch (Exception e) {
 			LOG.info("Failed to copy from blob store. Downloading from BLOB server instead.", e);
 		}
@@ -294,28 +257,23 @@ public final class BlobCache implements BlobService {
 	}
 
 	@Override
-	public void shutdown() {
+	public void close() throws IOException {
 		if (shutdownRequested.compareAndSet(false, true)) {
 			LOG.info("Shutting down BlobCache");
 
 			// Clean up the storage directory
 			try {
 				FileUtils.deleteDirectory(storageDir);
-			}
-			catch (IOException e) {
-				LOG.error("BLOB cache failed to properly clean up its storage directory.");
-			}
-
-			// Remove shutdown hook to prevent resource leaks, unless this is invoked by the shutdown hook itself
-			if (shutdownHook != null && shutdownHook != Thread.currentThread()) {
-				try {
-					Runtime.getRuntime().removeShutdownHook(shutdownHook);
-				}
-				catch (IllegalStateException e) {
-					// race, JVM is in shutdown already, we can safely ignore this
-				}
-				catch (Throwable t) {
-					LOG.warn("Exception while unregistering BLOB cache's cleanup shutdown hook.");
+			} finally {
+				// Remove shutdown hook to prevent resource leaks, unless this is invoked by the shutdown hook itself
+				if (shutdownHook != null && shutdownHook != Thread.currentThread()) {
+					try {
+						Runtime.getRuntime().removeShutdownHook(shutdownHook);
+					} catch (IllegalStateException e) {
+						// race, JVM is in shutdown already, we can safely ignore this
+					} catch (Throwable t) {
+						LOG.warn("Exception while unregistering BLOB cache's cleanup shutdown hook.");
+					}
 				}
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index 8a70559..a006981 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -21,9 +21,9 @@ package org.apache.flink.runtime.blob;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.net.SSLUtils;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.NetUtils;
 
@@ -94,19 +94,14 @@ public class BlobServer extends Thread implements BlobService {
 	/**
 	 * Instantiates a new BLOB server and binds it to a free network port.
 	 *
+	 * @param config Configuration to be used to instantiate the BlobServer
+	 * @param blobStore BlobStore to store blobs persistently
+	 *
 	 * @throws IOException
 	 * 		thrown if the BLOB server cannot bind to a free network port or if the
 	 * 		(local or distributed) file storage cannot be created or is not usable
 	 */
-	public BlobServer(Configuration config) throws IOException {
-		this(config, BlobUtils.createBlobStoreFromConfig(config));
-	}
-
-	public BlobServer(Configuration config, HighAvailabilityServices haServices) throws IOException {
-		this(config, haServices.createBlobStore());
-	}
-
-	private BlobServer(Configuration config, BlobStore blobStore) throws IOException {
+	public BlobServer(Configuration config, BlobStore blobStore) throws IOException {
 		this.blobServiceConfiguration = checkNotNull(config);
 		this.blobStore = checkNotNull(blobStore);
 
@@ -271,7 +266,12 @@ public class BlobServer extends Thread implements BlobService {
 		catch (Throwable t) {
 			if (!this.shutdownRequested.get()) {
 				LOG.error("BLOB server stopped working. Shutting down", t);
-				shutdown();
+
+				try {
+					close();
+				} catch (Throwable closeThrowable) {
+					LOG.error("Could not properly close the BlobServer.", closeThrowable);
+				}
 			}
 		}
 	}
@@ -280,13 +280,15 @@ public class BlobServer extends Thread implements BlobService {
 	 * Shuts down the BLOB server.
 	 */
 	@Override
-	public void shutdown() {
+	public void close() throws IOException {
 		if (shutdownRequested.compareAndSet(false, true)) {
+			Exception exception = null;
+
 			try {
 				this.serverSocket.close();
 			}
 			catch (IOException ioe) {
-				LOG.debug("Error while closing the server socket.", ioe);
+				exception = ioe;
 			}
 
 			// wake the thread up, in case it is waiting on some operation
@@ -296,13 +298,15 @@ public class BlobServer extends Thread implements BlobService {
 				join();
 			}
 			catch (InterruptedException ie) {
+				Thread.currentThread().interrupt();
+
 				LOG.debug("Error while waiting for this thread to die.", ie);
 			}
 
 			synchronized (activeConnections) {
 				if (!activeConnections.isEmpty()) {
 					for (BlobServerConnection conn : activeConnections) {
-						LOG.debug("Shutting down connection " + conn.getName());
+						LOG.debug("Shutting down connection {}.", conn.getName());
 						conn.close();
 					}
 					activeConnections.clear();
@@ -314,7 +318,7 @@ public class BlobServer extends Thread implements BlobService {
 				FileUtils.deleteDirectory(storageDir);
 			}
 			catch (IOException e) {
-				LOG.error("BLOB server failed to properly clean up its storage directory.");
+				exception = ExceptionUtils.firstOrSuppressed(e, exception);
 			}
 
 			// Remove shutdown hook to prevent resource leaks, unless this is invoked by the
@@ -327,13 +331,15 @@ public class BlobServer extends Thread implements BlobService {
 					// race, JVM is in shutdown already, we can safely ignore this
 				}
 				catch (Throwable t) {
-					LOG.warn("Exception while unregistering BLOB server's cleanup shutdown hook.");
+					LOG.warn("Exception while unregistering BLOB server's cleanup shutdown hook.", t);
 				}
 			}
 
 			if(LOG.isInfoEnabled()) {
 				LOG.info("Stopped BLOB server at {}:{}", serverSocket.getInetAddress().getHostAddress(), getPort());
 			}
+
+			ExceptionUtils.tryRethrowIOException(exception);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
index 419ee8d..97a2d51 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
@@ -18,13 +18,14 @@
 
 package org.apache.flink.runtime.blob;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.net.URL;
 
 /**
  * A simple store and retrieve binary large objects (BLOBs).
  */
-public interface BlobService {
+public interface BlobService extends Closeable {
 
 	/**
 	 * This method returns the URL of the file associated with the provided blob key.
@@ -49,11 +50,6 @@ public interface BlobService {
 	 * @return the port of the blob service.
 	 */
 	int getPort();
-
-	/**
-	 * Shutdown method which is called to terminate the blob service.
-	 */
-	void shutdown();
 	
 	BlobClient createClient() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java
index 64dc942..4c26a5a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java
@@ -26,7 +26,7 @@ import java.io.IOException;
 /**
  * A blob store.
  */
-public interface BlobStore {
+public interface BlobStore extends BlobView {
 
 	/**
 	 * Copies the local file to the blob store.
@@ -50,25 +50,6 @@ public interface BlobStore {
 	void put(File localFile, JobID jobId, String key) throws IOException;
 
 	/**
-	 * Copies a blob to a local file.
-	 *
-	 * @param blobKey   The blob ID
-	 * @param localFile The local file to copy to
-	 * @throws IOException If the copy fails
-	 */
-	void get(BlobKey blobKey, File localFile) throws IOException;
-
-	/**
-	 * Copies a blob to a local file.
-	 *
-	 * @param jobId     The JobID part of ID for the blob
-	 * @param key       The String part of ID for the blob
-	 * @param localFile The local file to copy to
-	 * @throws IOException If the copy fails
-	 */
-	void get(JobID jobId, String key, File localFile) throws IOException;
-
-	/**
 	 * Tries to delete a blob from storage.
 	 *
 	 * <p>NOTE: This also tries to delete any created directories if empty.</p>
@@ -95,9 +76,4 @@ public interface BlobStore {
 	 * @param jobId The JobID part of all blobs to delete
 	 */
 	void deleteAll(JobID jobId);
-
-	/**
-	 * Cleans up the store and deletes all blobs.
-	 */
-	void cleanUp();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStoreService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStoreService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStoreService.java
new file mode 100644
index 0000000..83cd9d4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStoreService.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import java.io.Closeable;
+
+/**
+ * Service interface for the BlobStore which allows to close and clean up its data.
+ */
+public interface BlobStoreService extends BlobStore, Closeable {
+
+	/**
+	 * Closes and cleans up the store. This entails the deletion of all blobs.
+	 */
+	void closeAndCleanupAllData();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
index 3c14f2f..8da362d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
@@ -22,8 +22,10 @@ import com.google.common.io.BaseEncoding;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.StringUtils;
@@ -41,6 +43,7 @@ import java.security.NoSuchAlgorithmException;
 import java.util.UUID;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;
 
 /**
  * Utility class to work with blob data.
@@ -78,18 +81,49 @@ public class BlobUtils {
 	 * @throws IOException
 	 * 		thrown if the (distributed) file storage cannot be created
 	 */
-	static BlobStore createBlobStoreFromConfig(Configuration config) throws IOException {
+	public static BlobStoreService createBlobStoreFromConfig(Configuration config) throws IOException {
 		HighAvailabilityMode highAvailabilityMode = HighAvailabilityMode.fromConfig(config);
 
 		if (highAvailabilityMode == HighAvailabilityMode.NONE) {
 			return new VoidBlobStore();
 		} else if (highAvailabilityMode == HighAvailabilityMode.ZOOKEEPER) {
-			return ZooKeeperHaServices.createBlobStore(config);
+			return createFileSystemBlobStore(config);
 		} else {
 			throw new IllegalConfigurationException("Unexpected high availability mode '" + highAvailabilityMode + "'.");
 		}
 	}
 
+	private static BlobStoreService createFileSystemBlobStore(Configuration configuration) throws IOException {
+		String storagePath = configuration.getValue(
+			HighAvailabilityOptions.HA_STORAGE_PATH);
+		if (isNullOrWhitespaceOnly(storagePath)) {
+			throw new IllegalConfigurationException("Configuration is missing the mandatory parameter: " +
+				HighAvailabilityOptions.HA_STORAGE_PATH);
+		}
+
+		final Path path;
+		try {
+			path = new Path(storagePath);
+		} catch (Exception e) {
+			throw new IOException("Invalid path for highly available storage (" +
+				HighAvailabilityOptions.HA_STORAGE_PATH.key() + ')', e);
+		}
+
+		final FileSystem fileSystem;
+		try {
+			fileSystem = path.getFileSystem();
+		} catch (Exception e) {
+			throw new IOException("Could not create FileSystem for highly available storage (" +
+				HighAvailabilityOptions.HA_STORAGE_PATH.key() + ')', e);
+		}
+
+		final String clusterId =
+			configuration.getValue(HighAvailabilityOptions.HA_CLUSTER_ID);
+		storagePath += "/" + clusterId;
+
+		return new FileSystemBlobStore(fileSystem, storagePath);
+	}
+
 	/**
 	 * Creates a storage directory for a blob service.
 	 *
@@ -246,10 +280,10 @@ public class BlobUtils {
 			@Override
 			public void run() {
 				try {
-					service.shutdown();
+					service.close();
 				}
 				catch (Throwable t) {
-					logger.error("Error during shutdown of blob service via JVM shutdown hook: " + t.getMessage(), t);
+					logger.error("Error during shutdown of blob service via JVM shutdown hook.", t);
 				}
 			}
 		});

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobView.java
new file mode 100644
index 0000000..11cf011
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobView.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.api.common.JobID;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * View on blobs stored in a {@link BlobStore}.
+ */
+public interface BlobView {
+
+	/**
+	 * Copies a blob to a local file.
+	 *
+	 * @param blobKey   The blob ID
+	 * @param localFile The local file to copy to
+	 * @throws IOException If the copy fails
+	 */
+	void get(BlobKey blobKey, File localFile) throws IOException;
+
+	/**
+	 * Copies a blob to a local file.
+	 *
+	 * @param jobId     The JobID part of ID for the blob
+	 * @param key       The String part of ID for the blob
+	 * @param localFile The local file to copy to
+	 * @throws IOException If the copy fails
+	 */
+	void get(JobID jobId, String key, File localFile) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
index 7cfce7a..b54756c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
@@ -41,7 +41,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  *
  * <p>This is used in addition to the local blob storage for high availability.
  */
-public class FileSystemBlobStore implements BlobStore {
+public class FileSystemBlobStore implements BlobStoreService {
 
 	private static final Logger LOG = LoggerFactory.getLogger(FileSystemBlobStore.class);
 
@@ -157,14 +157,19 @@ public class FileSystemBlobStore implements BlobStore {
 	}
 
 	@Override
-	public void cleanUp() {
+	public void closeAndCleanupAllData() {
 		try {
 			LOG.debug("Cleaning up {}.", basePath);
 
 			fileSystem.delete(new Path(basePath), true);
 		}
 		catch (Exception e) {
-			LOG.error("Failed to clean up recovery directory.");
+			LOG.error("Failed to clean up recovery directory.", e);
 		}
 	}
+
+	@Override
+	public void close() throws IOException {
+		// nothing to do for the FileSystemBlobStore
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
index 8606844..c14d082 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
@@ -26,7 +26,7 @@ import java.io.IOException;
 /**
  * A blob store doing nothing.
  */
-public class VoidBlobStore implements BlobStore {
+public class VoidBlobStore implements BlobStoreService {
 
 	@Override
 	public void put(File localFile, BlobKey blobKey) throws IOException {
@@ -57,6 +57,8 @@ public class VoidBlobStore implements BlobStore {
 	}
 
 	@Override
-	public void cleanUp() {
-	}
+	public void closeAndCleanupAllData() {}
+
+	@Override
+	public void close() throws IOException {}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
index b570383..86d927a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
@@ -191,7 +191,8 @@ public class JobClient {
 	public static ClassLoader retrieveClassLoader(
 		JobID jobID,
 		ActorGateway jobManager,
-		Configuration config)
+		Configuration config,
+		HighAvailabilityServices highAvailabilityServices)
 		throws JobRetrievalException {
 
 		final Object jmAnswer;
@@ -213,7 +214,8 @@ public class JobClient {
 			InetSocketAddress serverAddress = new InetSocketAddress(jmHostname, props.blobManagerPort());
 			final BlobCache blobClient;
 			try {
-				blobClient = new BlobCache(serverAddress, config);
+				// TODO: Fix lifecycle of BlobCache to properly close it upon usage
+				blobClient = new BlobCache(serverAddress, config, highAvailabilityServices.createBlobStore());
 			} catch (IOException e) {
 				throw new JobRetrievalException(jobID,
 					"Failed to setup blob cache", e);
@@ -229,7 +231,12 @@ public class JobClient {
 				try {
 					allURLs[pos++] = blobClient.getURL(blobKey);
 				} catch (Exception e) {
-					blobClient.shutdown();
+					try {
+						blobClient.close();
+					} catch (IOException ioe) {
+						LOG.warn("Could not properly close the BlobClient.", ioe);
+					}
+
 					throw new JobRetrievalException(jobID, "Failed to download BlobKey " + blobKey, e);
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobListeningContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobListeningContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobListeningContext.java
index fe8c34c..bb448be 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobListeningContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobListeningContext.java
@@ -134,7 +134,11 @@ public final class JobListeningContext {
 	public ClassLoader getClassLoader() throws JobRetrievalException {
 		if (classLoader == null) {
 			// lazily initializes the class loader when it is needed
-			classLoader = JobClient.retrieveClassLoader(jobID, getJobManager(), configuration);
+			classLoader = JobClient.retrieveClassLoader(
+				jobID,
+				getJobManager(),
+				configuration,
+				highAvailabilityServices);
 			LOG.info("Reconstructed class loader for Job {}", jobID);
 		}
 		return classLoader;

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index ea508d1..5bdfe1a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -31,7 +31,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.webmonitor.WebMonitor;
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
 import org.apache.flink.util.NetUtils;
@@ -191,13 +190,12 @@ public class BootstrapTools {
 		if (config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) >= 0) {
 			logger.info("Starting JobManager Web Frontend");
 
-			LeaderRetrievalService leaderRetrievalService =
-				highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID);
-
 			// start the web frontend. we need to load this dynamically
 			// because it is not in the same project/dependencies
 			WebMonitor monitor = WebMonitorUtils.startWebRuntimeMonitor(
-				config, leaderRetrievalService, actorSystem);
+				config,
+				highAvailabilityServices,
+				actorSystem);
 
 			// start the web monitor
 			if (monitor != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
index b0d5d83..0702a11 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
@@ -208,7 +208,7 @@ public final class BlobLibraryCacheManager extends TimerTask implements LibraryC
 			LOG.warn("Failed to run clean up task before shutdown", t);
 		}
 
-		blobService.shutdown();
+		blobService.close();
 		cleanupTimer.cancel();
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
index c9e2957..2ebfd20 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.highavailability;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.blob.BlobStoreService;
+import org.apache.flink.runtime.blob.BlobUtils;
 import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
 import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneHaServices;
 import org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices;
@@ -49,10 +51,13 @@ public class HighAvailabilityServicesUtils {
 				return new EmbeddedHaServices(executor);
 
 			case ZOOKEEPER:
+				BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(config);
+
 				return new ZooKeeperHaServices(
 					ZooKeeperUtils.startCuratorFramework(config),
 					executor,
-					config);
+					config,
+					blobStoreService);
 
 			default:
 				throw new Exception("High availability mode " + highAvailabilityMode + " is not supported.");
@@ -85,10 +90,13 @@ public class HighAvailabilityServicesUtils {
 
 				return new StandaloneHaServices(resourceManagerRpcUrl, jobManagerRpcUrl);
 			case ZOOKEEPER:
+				BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(configuration);
+
 				return new ZooKeeperHaServices(
 					ZooKeeperUtils.startCuratorFramework(configuration),
 					executor,
-					configuration);
+					configuration,
+					blobStoreService);
 			default:
 				throw new Exception("Recovery mode " + highAvailabilityMode + " is not supported.");
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
index ac90e3f..9c3d986 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
@@ -44,10 +44,13 @@ public abstract class AbstractNonHaServices implements HighAvailabilityServices
 
 	private final RunningJobsRegistry runningJobsRegistry;
 
+	private final VoidBlobStore voidBlobStore;
+
 	private boolean shutdown;
 
 	public AbstractNonHaServices() {
 		this.runningJobsRegistry = new StandaloneRunningJobsRegistry();
+		this.voidBlobStore = new VoidBlobStore();
 
 		shutdown = false;
 	}
@@ -88,7 +91,7 @@ public abstract class AbstractNonHaServices implements HighAvailabilityServices
 		synchronized (lock) {
 			checkNotShutdown();
 
-			return new VoidBlobStore();
+			return voidBlobStore;
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
index 5d895c1..d4748cd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
@@ -23,11 +23,8 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.blob.BlobStore;
-import org.apache.flink.runtime.blob.FileSystemBlobStore;
+import org.apache.flink.runtime.blob.BlobStoreService;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointRecoveryFactory;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -36,12 +33,12 @@ import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
+import org.apache.flink.util.ExceptionUtils;
 
 import java.io.IOException;
 import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;
 
 /**
  * An implementation of the {@link HighAvailabilityServices} using Apache ZooKeeper.
@@ -102,11 +99,20 @@ public class ZooKeeperHaServices implements HighAvailabilityServices {
 	/** The zookeeper based running jobs registry */
 	private final RunningJobsRegistry runningJobsRegistry;
 
-	public ZooKeeperHaServices(CuratorFramework client, Executor executor, Configuration configuration) {
+	/** Store for arbitrary blobs */
+	private final BlobStoreService blobStoreService;
+
+	public ZooKeeperHaServices(
+			CuratorFramework client,
+			Executor executor,
+			Configuration configuration,
+			BlobStoreService blobStoreService) {
 		this.client = checkNotNull(client);
 		this.executor = checkNotNull(executor);
 		this.configuration = checkNotNull(configuration);
 		this.runningJobsRegistry = new ZooKeeperRunningJobsRegistry(client, configuration);
+
+		this.blobStoreService = checkNotNull(blobStoreService);
 	}
 
 	// ------------------------------------------------------------------------
@@ -150,61 +156,52 @@ public class ZooKeeperHaServices implements HighAvailabilityServices {
 
 	@Override
 	public BlobStore createBlobStore() throws IOException {
-		return createBlobStore(configuration);
+		return blobStoreService;
 	}
 
-	/**
-	 * Creates the BLOB store in which BLOBs are stored in a highly-available
-	 * fashion.
-	 *
-	 * @param configuration configuration to extract the storage path from
-	 * @return Blob store
-	 * @throws IOException if the blob store could not be created
-	 */
-	public static BlobStore createBlobStore(
-		final Configuration configuration) throws IOException {
-		String storagePath = configuration.getValue(
-			HighAvailabilityOptions.HA_STORAGE_PATH);
-		if (isNullOrWhitespaceOnly(storagePath)) {
-			throw new IllegalConfigurationException("Configuration is missing the mandatory parameter: " +
-					HighAvailabilityOptions.HA_STORAGE_PATH);
-		}
+	// ------------------------------------------------------------------------
+	//  Shutdown
+	// ------------------------------------------------------------------------
 
-		final Path path;
-		try {
-			path = new Path(storagePath);
-		} catch (Exception e) {
-			throw new IOException("Invalid path for highly available storage (" +
-					HighAvailabilityOptions.HA_STORAGE_PATH.key() + ')', e);
-		}
+	@Override
+	public void close() throws Exception {
+		Throwable exception = null;
 
-		final FileSystem fileSystem;
 		try {
-			fileSystem = path.getFileSystem();
-		} catch (Exception e) {
-			throw new IOException("Could not create FileSystem for highly available storage (" +
-					HighAvailabilityOptions.HA_STORAGE_PATH.key() + ')', e);
+			blobStoreService.close();
+		} catch (Throwable t) {
+			exception = t;
 		}
 
-		final String clusterId =
-			configuration.getValue(HighAvailabilityOptions.HA_CLUSTER_ID);
-		storagePath += "/" + clusterId;
+		internalClose();
 
-		return new FileSystemBlobStore(fileSystem, storagePath);
+		if (exception != null) {
+			ExceptionUtils.rethrowException(exception, "Could not properly close the ZooKeeperHaServices.");
+		}
 	}
 
-	// ------------------------------------------------------------------------
-	//  Shutdown
-	// ------------------------------------------------------------------------
-
 	@Override
-	public void close() throws Exception {
-		client.close();
+	public void closeAndCleanupAllData() throws Exception {
+		Throwable exception = null;
+
+		try {
+			blobStoreService.closeAndCleanupAllData();
+		} catch (Throwable t) {
+			exception = t;
+		}
+
+		internalClose();
+
+		if (exception != null) {
+			ExceptionUtils.rethrowException(exception, "Could not properly close and clean up all data of ZooKeeperHaServices.");
+		}
 	}
 
-	@Override
-	public void closeAndCleanupAllData() throws Exception {
-		close();
+	/**
+	 * Closes components which don't distinguish between close and closeAndCleanupAllData
+	 */
+	private void internalClose() {
+		client.close();
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
index 8cda0f7..ac4d06f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
@@ -105,7 +105,7 @@ public class JobManagerServices {
 			Configuration config,
 			HighAvailabilityServices haServices) throws Exception {
 
-		final BlobServer blobServer = new BlobServer(config, haServices);
+		final BlobServer blobServer = new BlobServer(config, haServices.createBlobStore());
 
 		final long cleanupInterval = config.getLong(
 			ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL,

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index d05d900..a919065 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -923,7 +923,10 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 
 		final LibraryCacheManager libraryCacheManager;
 		try {
-			final BlobCache blobCache = new BlobCache(blobServerAddress, taskManagerConfiguration.getConfiguration(), haServices);
+			final BlobCache blobCache = new BlobCache(
+				blobServerAddress,
+				taskManagerConfiguration.getConfiguration(),
+				haServices.createBlobStore());
 			libraryCacheManager = new BlobLibraryCacheManager(
 				blobCache,
 				taskManagerConfiguration.getCleanupInterval());

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
index 2baadb5..106d0f8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
@@ -28,10 +28,12 @@ import java.net.URI;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.blob.BlobView;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
@@ -117,12 +119,14 @@ public final class WebMonitorUtils {
 	 * Because failure to start the web runtime monitor is not considered fatal, this method does
 	 * not throw any exceptions, but only logs them.
 	 *
-	 * @param config                 The configuration for the runtime monitor.
-	 * @param leaderRetrievalService Leader retrieval service to get the leading JobManager
+	 * @param config The configuration for the runtime monitor.
+	 * @param highAvailabilityServices HighAvailabilityServices used to start the WebRuntimeMonitor
+	 * @param actorSystem ActorSystem used to connect to the JobManager
+	 *
 	 */
 	public static WebMonitor startWebRuntimeMonitor(
 			Configuration config,
-			LeaderRetrievalService leaderRetrievalService,
+			HighAvailabilityServices highAvailabilityServices,
 			ActorSystem actorSystem) {
 		// try to load and instantiate the class
 		try {
@@ -130,9 +134,14 @@ public final class WebMonitorUtils {
 			Class<? extends WebMonitor> clazz = Class.forName(classname).asSubclass(WebMonitor.class);
 			
 			Constructor<? extends WebMonitor> constructor = clazz.getConstructor(Configuration.class,
-					LeaderRetrievalService.class,
-					ActorSystem.class);
-			return constructor.newInstance(config, leaderRetrievalService, actorSystem);
+				LeaderRetrievalService.class,
+				BlobView.class,
+				ActorSystem.class);
+			return constructor.newInstance(
+				config,
+				highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
+				highAvailabilityServices.createBlobStore(),
+				actorSystem);
 		} catch (ClassNotFoundException e) {
 			LOG.error("Could not load web runtime monitor. " +
 					"Probably reason: flink-runtime-web is not in the classpath");

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 5092643..1ea783b 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -36,7 +36,7 @@ import org.apache.flink.metrics.groups.UnregisteredMetricsGroup
 import org.apache.flink.metrics.{Gauge, MetricGroup}
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
 import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
-import org.apache.flink.runtime.blob.BlobServer
+import org.apache.flink.runtime.blob.{BlobServer, BlobStore}
 import org.apache.flink.runtime.checkpoint._
 import org.apache.flink.runtime.checkpoint.savepoint.{SavepointLoader, SavepointStore}
 import org.apache.flink.runtime.client._
@@ -46,7 +46,7 @@ import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceMa
 import org.apache.flink.runtime.clusterframework.types.ResourceID
 import org.apache.flink.runtime.concurrent.{AcceptFunction, ApplyFunction, BiFunction, Executors => FlinkExecutors}
 import org.apache.flink.runtime.execution.SuppressRestartsException
-import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
+import org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager, LibraryCacheManager}
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
 import org.apache.flink.runtime.executiongraph._
 import org.apache.flink.runtime.highavailability.{HighAvailabilityServices, HighAvailabilityServicesUtils}
@@ -2274,14 +2274,12 @@ object JobManager {
     val webMonitor: Option[WebMonitor] =
       if (configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) >= 0) {
         LOG.info("Starting JobManager web frontend")
-        val leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(
-          HighAvailabilityServices.DEFAULT_JOB_ID)
 
         // start the web frontend. we need to load this dynamically
         // because it is not in the same project/dependencies
         val webServer = WebMonitorUtils.startWebRuntimeMonitor(
           configuration,
-          leaderRetrievalService,
+          highAvailabilityServices,
           jobManagerSystem)
 
         Option(webServer)
@@ -2507,12 +2505,14 @@ object JobManager {
    * @param configuration The configuration from which to parse the config values.
    * @param futureExecutor to run JobManager's futures
    * @param ioExecutor to run blocking io operations
+   * @param blobStore to store blobs persistently
    * @return The members for a default JobManager.
    */
   def createJobManagerComponents(
       configuration: Configuration,
       futureExecutor: ScheduledExecutorService,
-      ioExecutor: Executor) :
+      ioExecutor: Executor,
+      blobStore: BlobStore) :
     (InstanceManager,
     FlinkScheduler,
     BlobLibraryCacheManager,
@@ -2558,7 +2558,7 @@ object JobManager {
     var libraryCacheManager: BlobLibraryCacheManager = null
 
     try {
-      blobServer = new BlobServer(configuration)
+      blobServer = new BlobServer(configuration, blobStore)
       instanceManager = new InstanceManager()
       scheduler = new FlinkScheduler(ExecutionContext.fromExecutor(futureExecutor))
       libraryCacheManager = new BlobLibraryCacheManager(blobServer, cleanupInterval)
@@ -2577,7 +2577,7 @@ object JobManager {
           instanceManager.shutdown()
         }
         if (blobServer != null) {
-          blobServer.shutdown()
+          blobServer.close()
         }
         
         throw t
@@ -2689,7 +2689,8 @@ object JobManager {
     metricsRegistry) = createJobManagerComponents(
       configuration,
       futureExecutor,
-      ioExecutor)
+      ioExecutor,
+      highAvailabilityServices.createBlobStore())
 
     val archiveProps = getArchiveProps(archiveClass, archiveCount, archivePath)
 
@@ -2745,7 +2746,7 @@ object JobManager {
     ioExecutor: Executor,
     instanceManager: InstanceManager,
     scheduler: FlinkScheduler,
-    libraryCacheManager: BlobLibraryCacheManager,
+    libraryCacheManager: LibraryCacheManager,
     archive: ActorRef,
     restartStrategyFactory: RestartStrategyFactory,
     timeout: FiniteDuration,

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 46c4404..2ace8db 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -37,7 +37,7 @@ import org.apache.flink.runtime.highavailability.{HighAvailabilityServices, High
 import org.apache.flink.runtime.instance.{ActorGateway, AkkaActorGateway}
 import org.apache.flink.runtime.jobgraph.JobGraph
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode
-import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, LeaderRetrievalService, StandaloneLeaderRetrievalService}
+import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, LeaderRetrievalService}
 import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager
 import org.apache.flink.runtime.util.{ExecutorThreadFactory, Hardware}
 import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
@@ -387,17 +387,13 @@ abstract class FlinkMiniCluster(
       config.getBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false) &&
         config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) >= 0) {
 
-      // TODO: Add support for HA: Make web server work independently from the JM
-      val leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(
-        HighAvailabilityServices.DEFAULT_JOB_ID)
-
       LOG.info("Starting JobManger web frontend")
       // start the new web frontend. we need to load this dynamically
       // because it is not in the same project/dependencies
       val webServer = Option(
         WebMonitorUtils.startWebRuntimeMonitor(
           config,
-          leaderRetrievalService,
+          highAvailabilityServices,
           actorSystem)
       )
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index 8677307..a535388 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -143,7 +143,8 @@ class LocalFlinkMiniCluster(
     metricsRegistry) = JobManager.createJobManagerComponents(
       config,
       futureExecutor,
-      ioExecutor)
+      ioExecutor,
+      highAvailabilityServices.createBlobStore())
 
     if (config.getBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false)) {
       metricsRegistry.get.startQueryService(system, null)
@@ -249,8 +250,6 @@ class LocalFlinkMiniCluster(
       taskManagerServices.getMemoryManager(),
       taskManagerServices.getIOManager(),
       taskManagerServices.getNetworkEnvironment,
-      highAvailabilityServices.getJobManagerLeaderRetriever(
-        HighAvailabilityServices.DEFAULT_JOB_ID),
       metricRegistry)
 
     if (config.getBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false)) {
@@ -315,7 +314,6 @@ class LocalFlinkMiniCluster(
     memoryManager: MemoryManager,
     ioManager: IOManager,
     networkEnvironment: NetworkEnvironment,
-    leaderRetrievalService: LeaderRetrievalService,
     metricsRegistry: MetricRegistry): Props = {
 
     TaskManager.getTaskManagerProps(
@@ -326,7 +324,7 @@ class LocalFlinkMiniCluster(
       memoryManager,
       ioManager,
       networkEnvironment,
-      leaderRetrievalService,
+      highAvailabilityServices,
       metricsRegistry)
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index a3110a4..7684a6b 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -125,7 +125,7 @@ class TaskManager(
     protected val ioManager: IOManager,
     protected val network: NetworkEnvironment,
     protected val numberOfSlots: Int,
-    protected val leaderRetrievalService: LeaderRetrievalService,
+    protected val highAvailabilityServices: HighAvailabilityServices,
     protected val metricsRegistry: FlinkMetricRegistry)
   extends FlinkActor
   with LeaderSessionMessageFilter // Mixin order is important: We want to filter after logging
@@ -149,6 +149,10 @@ class TaskManager(
   /** Handler for distributed files cached by this TaskManager */
   protected val fileCache = new FileCache(config.getTmpDirectories())
 
+  protected val leaderRetrievalService: LeaderRetrievalService = highAvailabilityServices.
+    getJobManagerLeaderRetriever(
+      HighAvailabilityServices.DEFAULT_JOB_ID)
+
   private var taskManagerMetricGroup : TaskManagerMetricGroup = _
 
   /** Actors which want to be notified once this task manager has been
@@ -959,7 +963,10 @@ class TaskManager(
       log.info(s"Determined BLOB server address to be $address. Starting BLOB cache.")
 
       try {
-        val blobcache = new BlobCache(address, config.getConfiguration())
+        val blobcache = new BlobCache(
+          address,
+          config.getConfiguration(),
+          highAvailabilityServices.createBlobStore())
         blobService = Option(blobcache)
         libraryCacheManager = Some(
           new BlobLibraryCacheManager(blobcache, config.getCleanupInterval()))
@@ -1039,12 +1046,24 @@ class TaskManager(
 
     // shut down BLOB and library cache
     libraryCacheManager foreach {
-      manager => manager.shutdown()
+      manager =>
+        try {
+          manager.shutdown()
+        } catch {
+          case ioe: IOException => log.error(
+            "Could not properly shutdown library cache manager.",
+            ioe)
+        }
     }
     libraryCacheManager = None
 
     blobService foreach {
-      service => service.shutdown()
+      service =>
+        try {
+          service.close()
+        } catch {
+          case ioe: IOException => log.error("Could not properly shutdown blob service.", ioe)
+        }
     }
     blobService = None
 
@@ -1905,9 +1924,6 @@ object TaskManager {
 
     val metricRegistry = taskManagerServices.getMetricRegistry()
 
-    val leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(
-      HighAvailabilityServices.DEFAULT_JOB_ID)
-
     // create the actor properties (which define the actor constructor parameters)
     val tmProps = getTaskManagerProps(
       taskManagerClass,
@@ -1917,7 +1933,7 @@ object TaskManager {
       taskManagerServices.getMemoryManager(),
       taskManagerServices.getIOManager(),
       taskManagerServices.getNetworkEnvironment(),
-      leaderRetrievalService,
+      highAvailabilityServices,
       metricRegistry)
 
     metricRegistry.startQueryService(actorSystem, resourceID)
@@ -1936,7 +1952,7 @@ object TaskManager {
     memoryManager: MemoryManager,
     ioManager: IOManager,
     networkEnvironment: NetworkEnvironment,
-    leaderRetrievalService: LeaderRetrievalService,
+    highAvailabilityServices: HighAvailabilityServices,
     metricsRegistry: FlinkMetricRegistry
   ): Props = {
     Props(
@@ -1948,7 +1964,7 @@ object TaskManager {
       ioManager,
       networkEnvironment,
       taskManagerConfig.getNumberSlots(),
-      leaderRetrievalService,
+      highAvailabilityServices,
       metricsRegistry)
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
index 34a8a39..1cf77ea 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
@@ -43,10 +43,10 @@ public class BlobCacheRetriesTest {
 	 * A test where the connection fails twice and then the get operation succeeds.
 	 */
 	@Test
-	public void testBlobFetchRetries() {
+	public void testBlobFetchRetries() throws IOException {
 		final Configuration config = new Configuration();
 
-		testBlobFetchRetries(config);
+		testBlobFetchRetries(config, new VoidBlobStore());
 	}
 
 	/**
@@ -54,13 +54,23 @@ public class BlobCacheRetriesTest {
 	 * (with high availability set).
 	 */
 	@Test
-	public void testBlobFetchRetriesHa() {
+	public void testBlobFetchRetriesHa() throws IOException {
 		final Configuration config = new Configuration();
 		config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
 		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
 			temporaryFolder.getRoot().getPath());
 
-		testBlobFetchRetries(config);
+		BlobStoreService blobStoreService = null;
+
+		try {
+			blobStoreService = BlobUtils.createBlobStoreFromConfig(config);
+
+			testBlobFetchRetries(config, blobStoreService);
+		} finally {
+			if (blobStoreService != null) {
+				blobStoreService.closeAndCleanupAllData();
+			}
+		}
 	}
 
 	/**
@@ -71,14 +81,14 @@ public class BlobCacheRetriesTest {
 	 * 		configuration to use (the BlobCache will get some additional settings
 	 * 		set compared to this one)
 	 */
-	private void testBlobFetchRetries(final Configuration config) {
+	private void testBlobFetchRetries(final Configuration config, final BlobStore blobStore) throws IOException {
 		final byte[] data = new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 0};
 
 		BlobServer server = null;
 		BlobCache cache = null;
 		try {
 
-			server = new TestingFailingBlobServer(config, 2);
+			server = new TestingFailingBlobServer(config, blobStore, 2);
 
 			final InetSocketAddress
 				serverAddress = new InetSocketAddress("localhost", server.getPort());
@@ -97,13 +107,7 @@ public class BlobCacheRetriesTest {
 				}
 			}
 
-			// create a separate config for the cache with no access to
-			// the (shared) storage path if available so that the cache
-			// will always bother the BlobServer!
-			final Configuration cacheConfig = new Configuration(config);
-			cacheConfig.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
-				temporaryFolder.getRoot().getPath() + "/does-not-exist");
-			cache = new BlobCache(serverAddress, cacheConfig);
+			cache = new BlobCache(serverAddress, config, new VoidBlobStore());
 
 			// trigger a download - it should fail the first two times, but retry, and succeed eventually
 			URL url = cache.getURL(key);
@@ -116,17 +120,12 @@ public class BlobCacheRetriesTest {
 			finally {
 				is.close();
 			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-		finally {
+		} finally {
 			if (cache != null) {
-				cache.shutdown();
+				cache.close();
 			}
 			if (server != null) {
-				server.shutdown();
+				server.close();
 			}
 		}
 	}
@@ -135,10 +134,10 @@ public class BlobCacheRetriesTest {
 	 * A test where the connection fails too often and eventually fails the GET request.
 	 */
 	@Test
-	public void testBlobFetchWithTooManyFailures() {
+	public void testBlobFetchWithTooManyFailures() throws IOException {
 		final Configuration config = new Configuration();
 
-		testBlobFetchWithTooManyFailures(config);
+		testBlobFetchWithTooManyFailures(config, new VoidBlobStore());
 	}
 
 	/**
@@ -146,13 +145,23 @@ public class BlobCacheRetriesTest {
 	 * (with high availability set).
 	 */
 	@Test
-	public void testBlobFetchWithTooManyFailuresHa() {
+	public void testBlobFetchWithTooManyFailuresHa() throws IOException {
 		final Configuration config = new Configuration();
 		config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
 		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
 			temporaryFolder.getRoot().getPath());
 
-		testBlobFetchWithTooManyFailures(config);
+		BlobStoreService blobStoreService = null;
+
+		try {
+			blobStoreService = BlobUtils.createBlobStoreFromConfig(config);
+
+			testBlobFetchWithTooManyFailures(config, blobStoreService);
+		} finally {
+			if (blobStoreService != null) {
+				blobStoreService.closeAndCleanupAllData();
+			}
+		}
 	}
 
 	/**
@@ -163,14 +172,14 @@ public class BlobCacheRetriesTest {
 	 * 		configuration to use (the BlobCache will get some additional settings
 	 * 		set compared to this one)
 	 */
-	private void testBlobFetchWithTooManyFailures(final Configuration config) {
+	private void testBlobFetchWithTooManyFailures(final Configuration config, final BlobStore blobStore) throws IOException {
 		final byte[] data = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 0 };
 
 		BlobServer server = null;
 		BlobCache cache = null;
 		try {
 
-			server = new TestingFailingBlobServer(config, 10);
+			server = new TestingFailingBlobServer(config, blobStore, 10);
 
 			final InetSocketAddress
 				serverAddress = new InetSocketAddress("localhost", server.getPort());
@@ -189,13 +198,7 @@ public class BlobCacheRetriesTest {
 				}
 			}
 
-			// create a separate config for the cache with no access to
-			// the (shared) storage path if available so that the cache
-			// will always bother the BlobServer!
-			final Configuration cacheConfig = new Configuration(config);
-			cacheConfig.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
-				temporaryFolder.getRoot().getPath() + "/does-not-exist");
-			cache = new BlobCache(serverAddress, cacheConfig);
+			cache = new BlobCache(serverAddress, config, new VoidBlobStore());
 
 			// trigger a download - it should fail eventually
 			try {
@@ -206,16 +209,12 @@ public class BlobCacheRetriesTest {
 				// as we expected
 			}
 		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
 		finally {
 			if (cache != null) {
-				cache.shutdown();
+				cache.close();
 			}
 			if (server != null) {
-				server.shutdown();
+				server.close();
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
index db55331..2a65a3b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
@@ -25,6 +25,7 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
+import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.URISyntaxException;
 import java.net.URL;
@@ -49,7 +50,7 @@ public class BlobCacheSuccessTest {
 	 * BlobServer.
 	 */
 	@Test
-	public void testBlobCache() {
+	public void testBlobCache() throws IOException {
 		Configuration config = new Configuration();
 		uploadFileGetTest(config, false, false);
 	}
@@ -60,7 +61,7 @@ public class BlobCacheSuccessTest {
 	 * BlobServer.
 	 */
 	@Test
-	public void testBlobCacheHa() {
+	public void testBlobCacheHa() throws IOException {
 		Configuration config = new Configuration();
 		config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
 		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
@@ -73,7 +74,7 @@ public class BlobCacheSuccessTest {
 	 * file system and thus needs to download BLOBs from the BlobServer.
 	 */
 	@Test
-	public void testBlobCacheHaFallback() {
+	public void testBlobCacheHaFallback() throws IOException {
 		Configuration config = new Configuration();
 		config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
 		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
@@ -82,17 +83,30 @@ public class BlobCacheSuccessTest {
 	}
 
 	private void uploadFileGetTest(final Configuration config, boolean cacheWorksWithoutServer,
-		boolean cacheHasAccessToFs) {
+		boolean cacheHasAccessToFs) throws IOException {
 		// First create two BLOBs and upload them to BLOB server
 		final byte[] buf = new byte[128];
 		final List<BlobKey> blobKeys = new ArrayList<BlobKey>(2);
 
 		BlobServer blobServer = null;
 		BlobCache blobCache = null;
+		BlobStoreService blobStoreService = null;
 		try {
+			final Configuration cacheConfig;
+			if (cacheHasAccessToFs) {
+				cacheConfig = config;
+			} else {
+				// just in case parameters are still read from the server,
+				// create a separate configuration object for the cache
+				cacheConfig = new Configuration(config);
+				cacheConfig.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
+					temporaryFolder.getRoot().getPath() + "/does-not-exist");
+			}
+
+			blobStoreService = BlobUtils.createBlobStoreFromConfig(cacheConfig);
 
 			// Start the BLOB server
-			blobServer = new BlobServer(config);
+			blobServer = new BlobServer(config, blobStoreService);
 			final InetSocketAddress serverAddress = new InetSocketAddress(blobServer.getPort());
 
 			// Upload BLOBs
@@ -112,22 +126,11 @@ public class BlobCacheSuccessTest {
 
 			if (cacheWorksWithoutServer) {
 				// Now, shut down the BLOB server, the BLOBs must still be accessible through the cache.
-				blobServer.shutdown();
+				blobServer.close();
 				blobServer = null;
 			}
 
-			final Configuration cacheConfig;
-			if (cacheHasAccessToFs) {
-				cacheConfig = config;
-			} else {
-				// just in case parameters are still read from the server,
-				// create a separate configuration object for the cache
-				cacheConfig = new Configuration(config);
-				cacheConfig.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
-					temporaryFolder.getRoot().getPath() + "/does-not-exist");
-			}
-
-			blobCache = new BlobCache(serverAddress, cacheConfig);
+			blobCache = new BlobCache(serverAddress, cacheConfig, blobStoreService);
 
 			for (BlobKey blobKey : blobKeys) {
 				blobCache.getURL(blobKey);
@@ -135,7 +138,7 @@ public class BlobCacheSuccessTest {
 
 			if (blobServer != null) {
 				// Now, shut down the BLOB server, the BLOBs must still be accessible through the cache.
-				blobServer.shutdown();
+				blobServer.close();
 				blobServer = null;
 			}
 
@@ -162,18 +165,17 @@ public class BlobCacheSuccessTest {
 					fail(e.getMessage());
 				}
 			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-		finally {
+		} finally {
 			if (blobServer != null) {
-				blobServer.shutdown();
+				blobServer.close();
 			}
 
 			if(blobCache != null){
-				blobCache.shutdown();
+				blobCache.close();
+			}
+
+			if (blobStoreService != null) {
+				blobStoreService.closeAndCleanupAllData();
 			}
 		}
 	}


[3/5] flink git commit: [FLINK-6519] Integrate BlobStore in lifecycle management of HighAvailabilityServices

Posted by tr...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
index 5054107..c106b3f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -42,7 +43,7 @@ import org.junit.Test;
 /**
  * This class contains unit tests for the {@link BlobClient} with ssl enabled.
  */
-public class BlobClientSslTest {
+public class BlobClientSslTest extends TestLogger {
 
 	/** The buffer size used during the tests in bytes. */
 	private static final int TEST_BUFFER_SIZE = 17 * 1000;
@@ -63,19 +64,14 @@ public class BlobClientSslTest {
 	 * Starts the SSL enabled BLOB server.
 	 */
 	@BeforeClass
-	public static void startSSLServer() {
-		try {
-			Configuration config = new Configuration();
-			config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true);
-			config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore");
-			config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password");
-			config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password");
-			BLOB_SSL_SERVER = new BlobServer(config);
-		}
-		catch (IOException e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
+	public static void startSSLServer() throws IOException {
+		Configuration config = new Configuration();
+		config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true);
+		config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore");
+		config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password");
+		config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password");
+		BLOB_SSL_SERVER = new BlobServer(config, new VoidBlobStore());
+
 
 		sslClientConfig = new Configuration();
 		sslClientConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true);
@@ -87,20 +83,14 @@ public class BlobClientSslTest {
 	 * Starts the SSL disabled BLOB server.
 	 */
 	@BeforeClass
-	public static void startNonSSLServer() {
-		try {
-			Configuration config = new Configuration();
-			config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true);
-			config.setBoolean(ConfigConstants.BLOB_SERVICE_SSL_ENABLED, false);
-			config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore");
-			config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password");
-			config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password");
-			BLOB_SERVER = new BlobServer(config);
-		}
-		catch (IOException e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
+	public static void startNonSSLServer() throws IOException {
+		Configuration config = new Configuration();
+		config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true);
+		config.setBoolean(ConfigConstants.BLOB_SERVICE_SSL_ENABLED, false);
+		config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore");
+		config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password");
+		config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password");
+		BLOB_SERVER = new BlobServer(config, new VoidBlobStore());
 
 		clientConfig = new Configuration();
 		clientConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true);
@@ -113,13 +103,13 @@ public class BlobClientSslTest {
 	 * Shuts the BLOB server down.
 	 */
 	@AfterClass
-	public static void stopServers() {
+	public static void stopServers() throws IOException {
 		if (BLOB_SSL_SERVER != null) {
-			BLOB_SSL_SERVER.shutdown();
+			BLOB_SSL_SERVER.close();
 		}
 
 		if (BLOB_SERVER != null) {
-			BLOB_SERVER.shutdown();
+			BLOB_SERVER.close();
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
index 8f8f8c5..fda4ee9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
@@ -57,24 +57,18 @@ public class BlobClientTest {
 	 * Starts the BLOB server.
 	 */
 	@BeforeClass
-	public static void startServer() {
-		try {
-			blobServiceConfig = new Configuration();
-			BLOB_SERVER = new BlobServer(blobServiceConfig);
-		}
-		catch (IOException e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
+	public static void startServer() throws IOException {
+		blobServiceConfig = new Configuration();
+		BLOB_SERVER = new BlobServer(blobServiceConfig, new VoidBlobStore());
 	}
 
 	/**
 	 * Shuts the BLOB server down.
 	 */
 	@AfterClass
-	public static void stopServer() {
+	public static void stopServer() throws IOException {
 		if (BLOB_SERVER != null) {
-			BLOB_SERVER.shutdown();
+			BLOB_SERVER.close();
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
index f8d50d5..4f12ddb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
@@ -30,16 +30,13 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
-import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Random;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -59,10 +56,20 @@ public class BlobRecoveryITCase extends TestLogger {
 		config.setString(CoreOptions.STATE_BACKEND, "FILESYSTEM");
 		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.getRoot().getPath());
 
-		testBlobServerRecovery(config);
+		BlobStoreService blobStoreService = null;
+
+		try {
+			blobStoreService = BlobUtils.createBlobStoreFromConfig(config);
+
+			testBlobServerRecovery(config, blobStoreService);
+		} finally {
+			if (blobStoreService != null) {
+				blobStoreService.closeAndCleanupAllData();
+			}
+		}
 	}
 
-	public static void testBlobServerRecovery(final Configuration config) throws IOException {
+	public static void testBlobServerRecovery(final Configuration config, final BlobStore blobStore) throws IOException {
 		final String clusterId = config.getString(HighAvailabilityOptions.HA_CLUSTER_ID);
 		String storagePath = config.getString(HighAvailabilityOptions.HA_STORAGE_PATH) + "/" + clusterId;
 		Random rand = new Random();
@@ -73,7 +80,7 @@ public class BlobRecoveryITCase extends TestLogger {
 
 		try {
 			for (int i = 0; i < server.length; i++) {
-				server[i] = new BlobServer(config);
+				server[i] = new BlobServer(config, blobStore);
 				serverAddress[i] = new InetSocketAddress("localhost", server[i].getPort());
 			}
 
@@ -166,7 +173,7 @@ public class BlobRecoveryITCase extends TestLogger {
 		finally {
 			for (BlobServer s : server) {
 				if (s != null) {
-					s.shutdown();
+					s.close();
 				}
 			}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
index 025a2ff..e8e28a1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
@@ -44,10 +44,11 @@ public class BlobServerDeleteTest {
 	public void testDeleteSingle() {
 		BlobServer server = null;
 		BlobClient client = null;
+		BlobStore blobStore = new VoidBlobStore();
 
 		try {
 			Configuration config = new Configuration();
-			server = new BlobServer(config);
+			server = new BlobServer(config, blobStore);
 
 			InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
 			client = new BlobClient(serverAddress, config);
@@ -93,10 +94,11 @@ public class BlobServerDeleteTest {
 	public void testDeleteAll() {
 		BlobServer server = null;
 		BlobClient client = null;
+		BlobStore blobStore = new VoidBlobStore();
 
 		try {
 			Configuration config = new Configuration();
-			server = new BlobServer(config);
+			server = new BlobServer(config, blobStore);
 
 			InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
 			client = new BlobClient(serverAddress, config);
@@ -156,10 +158,11 @@ public class BlobServerDeleteTest {
 	public void testDeleteAlreadyDeletedByBlobKey() {
 		BlobServer server = null;
 		BlobClient client = null;
+		BlobStore blobStore = new VoidBlobStore();
 
 		try {
 			Configuration config = new Configuration();
-			server = new BlobServer(config);
+			server = new BlobServer(config, blobStore);
 
 			InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
 			client = new BlobClient(serverAddress, config);
@@ -195,10 +198,11 @@ public class BlobServerDeleteTest {
 	public void testDeleteAlreadyDeletedByName() {
 		BlobServer server = null;
 		BlobClient client = null;
+		BlobStore blobStore = new VoidBlobStore();
 
 		try {
 			Configuration config = new Configuration();
-			server = new BlobServer(config);
+			server = new BlobServer(config, blobStore);
 
 			InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
 			client = new BlobClient(serverAddress, config);
@@ -237,10 +241,11 @@ public class BlobServerDeleteTest {
 
 		BlobServer server = null;
 		BlobClient client = null;
+		BlobStore blobStore = new VoidBlobStore();
 
 		try {
 			Configuration config = new Configuration();
-			server = new BlobServer(config);
+			server = new BlobServer(config, blobStore);
 
 			InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
 			client = new BlobClient(serverAddress, config);
@@ -289,7 +294,11 @@ public class BlobServerDeleteTest {
 			}
 		}
 		if (server != null) {
-			server.shutdown();
+			try {
+				server.close();
+			} catch (IOException e) {
+				e.printStackTrace();
+			}
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
index 59a62e1..6d1dba8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
@@ -40,13 +40,13 @@ public class BlobServerGetTest {
 	private final Random rnd = new Random();
 
 	@Test
-	public void testGetFailsDuringLookup() {
+	public void testGetFailsDuringLookup() throws IOException {
 		BlobServer server = null;
 		BlobClient client = null;
 
 		try {
 			Configuration config = new Configuration();
-			server = new BlobServer(config);
+			server = new BlobServer(config, new VoidBlobStore());
 
 			InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
 			client = new BlobClient(serverAddress, config);
@@ -66,37 +66,27 @@ public class BlobServerGetTest {
 			try {
 				client.get(key);
 				fail("This should not succeed.");
-			}
-			catch (IOException e) {
+			} catch (IOException e) {
 				// expected
 			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-		finally {
+		} finally {
 			if (client != null) {
-				try {
-					client.close();
-				} catch (Throwable t) {
-					t.printStackTrace();
-				}
+				client.close();
 			}
 			if (server != null) {
-				server.shutdown();
+				server.close();
 			}
 		}
 	}
 
 	@Test
-	public void testGetFailsDuringStreaming() {
+	public void testGetFailsDuringStreaming() throws IOException {
 		BlobServer server = null;
 		BlobClient client = null;
 
 		try {
 			Configuration config = new Configuration();
-			server = new BlobServer(config);
+			server = new BlobServer(config, new VoidBlobStore());
 
 			InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
 			client = new BlobClient(serverAddress, config);
@@ -129,21 +119,12 @@ public class BlobServerGetTest {
 			catch (IOException e) {
 				// expected
 			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-		finally {
+		} finally {
 			if (client != null) {
-				try {
-					client.close();
-				} catch (Throwable t) {
-					t.printStackTrace();
-				}
+				client.close();
 			}
 			if (server != null) {
-				server.shutdown();
+				server.close();
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
index c4d6d1c..441ca7d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
@@ -42,13 +42,13 @@ public class BlobServerPutTest {
 	private final Random rnd = new Random();
 
 	@Test
-	public void testPutBufferSuccessful() {
+	public void testPutBufferSuccessful() throws IOException {
 		BlobServer server = null;
 		BlobClient client = null;
 
 		try {
 			Configuration config = new Configuration();
-			server = new BlobServer(config);
+			server = new BlobServer(config, new VoidBlobStore());
 
 			InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
 			client = new BlobClient(serverAddress, config);
@@ -95,34 +95,25 @@ public class BlobServerPutTest {
 			BlobUtils.readFully(is3, result3, 0, result3.length, null);
 			is3.close();
 			assertArrayEquals(data, result3);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-		finally {
+		} finally {
 			if (client != null) {
-				try {
-					client.close();
-				} catch (Throwable t) {
-					t.printStackTrace();
-				}
+				client.close();
 			}
 			if (server != null) {
-				server.shutdown();
+				server.close();
 			}
 		}
 	}
 
 
 	@Test
-	public void testPutStreamSuccessful() {
+	public void testPutStreamSuccessful() throws IOException {
 		BlobServer server = null;
 		BlobClient client = null;
 
 		try {
 			Configuration config = new Configuration();
-			server = new BlobServer(config);
+			server = new BlobServer(config, new VoidBlobStore());
 
 			InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
 			client = new BlobClient(serverAddress, config);
@@ -143,12 +134,7 @@ public class BlobServerPutTest {
 				String stringKey = "my test key";
 				client.put(jid, stringKey, new ByteArrayInputStream(data));
 			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-		finally {
+		} finally {
 			if (client != null) {
 				try {
 					client.close();
@@ -157,19 +143,19 @@ public class BlobServerPutTest {
 				}
 			}
 			if (server != null) {
-				server.shutdown();
+				server.close();
 			}
 		}
 	}
 
 	@Test
-	public void testPutChunkedStreamSuccessful() {
+	public void testPutChunkedStreamSuccessful() throws IOException {
 		BlobServer server = null;
 		BlobClient client = null;
 
 		try {
 			Configuration config = new Configuration();
-			server = new BlobServer(config);
+			server = new BlobServer(config, new VoidBlobStore());
 
 			InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
 			client = new BlobClient(serverAddress, config);
@@ -190,27 +176,18 @@ public class BlobServerPutTest {
 				String stringKey = "my test key";
 				client.put(jid, stringKey, new ChunkedInputStream(data, 17));
 			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-		finally {
+		} finally {
 			if (client != null) {
-				try {
-					client.close();
-				} catch (Throwable t) {
-					t.printStackTrace();
-				}
+				client.close();
 			}
 			if (server != null) {
-				server.shutdown();
+				server.close();
 			}
 		}
 	}
 
 	@Test
-	public void testPutBufferFails() {
+	public void testPutBufferFails() throws IOException {
 		assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't work on Windows.
 
 		BlobServer server = null;
@@ -219,7 +196,7 @@ public class BlobServerPutTest {
 		File tempFileDir = null;
 		try {
 			Configuration config = new Configuration();
-			server = new BlobServer(config);
+			server = new BlobServer(config, new VoidBlobStore());
 
 			// make sure the blob server cannot create any files in its storage dir
 			tempFileDir = server.createTemporaryFilename().getParentFile().getParentFile();
@@ -250,31 +227,22 @@ public class BlobServerPutTest {
 				// expected
 			}
 
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-		finally {
+		} finally {
 			// set writable again to make sure we can remove the directory
 			if (tempFileDir != null) {
 				tempFileDir.setWritable(true, false);
 			}
 			if (client != null) {
-				try {
-					client.close();
-				} catch (Throwable t) {
-					t.printStackTrace();
-				}
+				client.close();
 			}
 			if (server != null) {
-				server.shutdown();
+				server.close();
 			}
 		}
 	}
 
 	@Test
-	public void testPutNamedBufferFails() {
+	public void testPutNamedBufferFails() throws IOException {
 		assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't work on Windows.
 
 		BlobServer server = null;
@@ -283,7 +251,7 @@ public class BlobServerPutTest {
 		File tempFileDir = null;
 		try {
 			Configuration config = new Configuration();
-			server = new BlobServer(config);
+			server = new BlobServer(config, new VoidBlobStore());
 
 			// make sure the blob server cannot create any files in its storage dir
 			tempFileDir = server.createTemporaryFilename().getParentFile().getParentFile();
@@ -317,25 +285,16 @@ public class BlobServerPutTest {
 			catch (IllegalStateException e) {
 				// expected
 			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-		finally {
+		} finally {
 			// set writable again to make sure we can remove the directory
 			if (tempFileDir != null) {
 				tempFileDir.setWritable(true, false);
 			}
 			if (client != null) {
-				try {
-					client.close();
-				} catch (Throwable t) {
-					t.printStackTrace();
-				}
+				client.close();
 			}
 			if (server != null) {
-				server.shutdown();
+				server.close();
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java
index ea0eb94..fbcd4a1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java
@@ -39,8 +39,8 @@ public class BlobServerRangeTest extends TestLogger {
 	public void testOnEphemeralPort() throws IOException {
 		Configuration conf = new Configuration();
 		conf.setString(ConfigConstants.BLOB_SERVER_PORT, "0");
-		BlobServer srv = new BlobServer(conf);
-		srv.shutdown();
+		BlobServer srv = new BlobServer(conf, new VoidBlobStore());
+		srv.close();
 	}
 
 	/**
@@ -63,7 +63,7 @@ public class BlobServerRangeTest extends TestLogger {
 
 		// this thing is going to throw an exception
 		try {
-			BlobServer srv = new BlobServer(conf);
+			BlobServer srv = new BlobServer(conf, new VoidBlobStore());
 		} finally {
 			socket.close();
 		}
@@ -92,9 +92,9 @@ public class BlobServerRangeTest extends TestLogger {
 
 		// this thing is going to throw an exception
 		try {
-			BlobServer srv = new BlobServer(conf);
+			BlobServer srv = new BlobServer(conf, new VoidBlobStore());
 			Assert.assertEquals(availablePort, srv.getPort());
-			srv.shutdown();
+			srv.close();
 		} finally {
 			sockets[0].close();
 			sockets[1].close();

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingFailingBlobServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingFailingBlobServer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingFailingBlobServer.java
index 93f9b73..91e119b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingFailingBlobServer.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingFailingBlobServer.java
@@ -28,8 +28,8 @@ public class TestingFailingBlobServer extends BlobServer {
 
 	private int numFailures;
 
-	public TestingFailingBlobServer(Configuration config, int numFailures) throws IOException {
-		super(config);
+	public TestingFailingBlobServer(Configuration config, BlobStore blobStore, int numFailures) throws IOException {
+		super(config, blobStore);
 		this.numFailures = numFailures;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
index 5d9ade3..98e6b3e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
@@ -18,13 +18,12 @@
 
 package org.apache.flink.runtime.execution.librarycache;
 
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.blob.BlobClient;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.util.OperatingSystem;
@@ -45,7 +44,7 @@ import java.util.List;
 public class BlobLibraryCacheManagerTest {
 
 	@Test
-	public void testLibraryCacheManagerCleanup() {
+	public void testLibraryCacheManagerCleanup() throws IOException, InterruptedException {
 
 		JobID jid = new JobID();
 		List<BlobKey> keys = new ArrayList<BlobKey>();
@@ -56,7 +55,7 @@ public class BlobLibraryCacheManagerTest {
 
 		try {
 			Configuration config = new Configuration();
-			server = new BlobServer(config);
+			server = new BlobServer(config, new VoidBlobStore());
 			InetSocketAddress blobSocketAddress = new InetSocketAddress(server.getPort());
 			BlobClient bc = new BlobClient(blobSocketAddress, config);
 
@@ -108,14 +107,9 @@ public class BlobLibraryCacheManagerTest {
 			assertEquals(2, caughtExceptions);
 
 			bc.close();
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-		finally {
+		} finally {
 			if (server != null) {
-				server.shutdown();
+				server.close();
 			}
 
 			if (libraryCacheManager != null) {
@@ -130,7 +124,7 @@ public class BlobLibraryCacheManagerTest {
 	}
 
 	@Test
-	public void testRegisterAndDownload() {
+	public void testRegisterAndDownload() throws IOException {
 		assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't work on Windows.
 
 		BlobServer server = null;
@@ -139,9 +133,9 @@ public class BlobLibraryCacheManagerTest {
 		try {
 			// create the blob transfer services
 			Configuration config = new Configuration();
-			server = new BlobServer(config);
+			server = new BlobServer(config, new VoidBlobStore());
 			InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
-			cache = new BlobCache(serverAddress, config);
+			cache = new BlobCache(serverAddress, config, new VoidBlobStore());
 
 			// upload some meaningless data to the server
 			BlobClient uploader = new BlobClient(serverAddress, config);
@@ -210,22 +204,17 @@ public class BlobLibraryCacheManagerTest {
 			catch (IOException e) {
 				// splendid!
 			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-		finally {
+		} finally {
 			if (cacheDir != null) {
 				if (!cacheDir.setWritable(true, false)) {
 					System.err.println("Could not re-add write permissions to cache directory.");
 				}
 			}
 			if (cache != null) {
-				cache.shutdown();
+				cache.close();
 			}
 			if (server != null) {
-				server.shutdown();
+				server.close();
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
index 54e1a9b..16e3a05 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
@@ -26,6 +26,8 @@ import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.blob.BlobClient;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.BlobStoreService;
+import org.apache.flink.runtime.blob.BlobUtils;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.util.TestLogger;
@@ -63,6 +65,7 @@ public class BlobLibraryCacheRecoveryITCase extends TestLogger {
 		BlobLibraryCacheManager[] libServer = new BlobLibraryCacheManager[2];
 		BlobCache cache = null;
 		BlobLibraryCacheManager libCache = null;
+		BlobStoreService blobStoreService = null;
 
 		Configuration config = new Configuration();
 		config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
@@ -70,8 +73,10 @@ public class BlobLibraryCacheRecoveryITCase extends TestLogger {
 		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.getRoot().getAbsolutePath());
 
 		try {
+			blobStoreService = BlobUtils.createBlobStoreFromConfig(config);
+
 			for (int i = 0; i < server.length; i++) {
-				server[i] = new BlobServer(config);
+				server[i] = new BlobServer(config, blobStoreService);
 				serverAddress[i] = new InetSocketAddress("localhost", server[i].getPort());
 				libServer[i] = new BlobLibraryCacheManager(server[i], 3600 * 1000);
 			}
@@ -89,7 +94,7 @@ public class BlobLibraryCacheRecoveryITCase extends TestLogger {
 			}
 
 			// The cache
-			cache = new BlobCache(serverAddress[0], config);
+			cache = new BlobCache(serverAddress[0], config, blobStoreService);
 			libCache = new BlobLibraryCacheManager(cache, 3600 * 1000);
 
 			// Register uploaded libraries
@@ -110,10 +115,10 @@ public class BlobLibraryCacheRecoveryITCase extends TestLogger {
 			}
 
 			// Shutdown cache and start with other server
-			cache.shutdown();
+			cache.close();
 			libCache.shutdown();
 
-			cache = new BlobCache(serverAddress[1], config);
+			cache = new BlobCache(serverAddress[1], config, blobStoreService);
 			libCache = new BlobLibraryCacheManager(cache, 3600 * 1000);
 
 			// Verify key 1
@@ -156,17 +161,21 @@ public class BlobLibraryCacheRecoveryITCase extends TestLogger {
 		finally {
 			for (BlobServer s : server) {
 				if (s != null) {
-					s.shutdown();
+					s.close();
 				}
 			}
 
 			if (cache != null) {
-				cache.shutdown();
+				cache.close();
 			}
 
 			if (libCache != null) {
 				libCache.shutdown();
 			}
+
+			if (blobStoreService != null) {
+				blobStoreService.closeAndCleanupAllData();
+			}
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRegistryTest.java
index 06ffe3c..d89093d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRegistryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRegistryTest.java
@@ -22,11 +22,11 @@ import org.apache.curator.test.TestingServer;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
 import org.apache.flink.runtime.highavailability.RunningJobsRegistry.JobSchedulingStatus;
-import org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.util.TestLogger;
 
@@ -62,7 +62,10 @@ public class ZooKeeperRegistryTest extends TestLogger {
 		configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
 
 		final HighAvailabilityServices zkHaService = new ZooKeeperHaServices(
-				ZooKeeperUtils.startCuratorFramework(configuration), Executors.directExecutor(), configuration);
+				ZooKeeperUtils.startCuratorFramework(configuration),
+			Executors.directExecutor(),
+			configuration,
+			new VoidBlobStore());
 
 		final RunningJobsRegistry zkRegistry = zkHaService.getRunningJobsRegistry();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index b8b5984..a63b02d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -43,7 +43,6 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
-import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
@@ -71,7 +70,6 @@ import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.state.ChainedStateHandle;
-import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.TaskStateHandles;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
@@ -102,7 +100,6 @@ import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 import scala.runtime.BoxedUnit;
 
-import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -190,7 +187,11 @@ public class JobManagerHARecoveryTest extends TestLogger {
 				TestingUtils.defaultExecutor(),
 				instanceManager,
 				scheduler,
-				new BlobLibraryCacheManager(new BlobServer(flinkConfiguration), 3600000),
+				new BlobLibraryCacheManager(
+					new BlobServer(
+						flinkConfiguration,
+						testingHighAvailabilityServices.createBlobStore()),
+					3600000L),
 				archive,
 				new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(Int.MaxValue(), 100),
 				timeout,

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
index d6257ba..70800e5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
@@ -31,6 +31,7 @@ import org.apache.curator.test.TestingServer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
@@ -184,7 +185,7 @@ public class JobManagerLeaderElectionTest extends TestLogger {
 			TestingUtils.defaultExecutor(),
 			new InstanceManager(),
 			new Scheduler(TestingUtils.defaultExecutionContext()),
-			new BlobLibraryCacheManager(new BlobServer(configuration), 10L),
+			new BlobLibraryCacheManager(new BlobServer(configuration, new VoidBlobStore()), 10L),
 			ActorRef.noSender(),
 			new NoRestartStrategy.NoRestartStrategyFactory(),
 			AkkaUtils.getDefaultTimeoutAsFiniteDuration(),

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
index 0ea47f2..0282a4f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
@@ -18,17 +18,21 @@
 
 package org.apache.flink.runtime.leaderelection;
 
+import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.test.TestingServer;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices;
 import org.apache.flink.runtime.jobmaster.JobMaster;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.After;
@@ -64,10 +68,13 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{
 		config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
 		config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString());
 
-		highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
-			config,
+		CuratorFramework client = ZooKeeperUtils.startCuratorFramework(config);
+
+		highAvailabilityServices = new ZooKeeperHaServices(
+			client,
 			TestingUtils.defaultExecutor(),
-			HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
+			config,
+			new VoidBlobStore());
 	}
 
 	@After

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
index 58f2231..d6fc48c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
@@ -97,7 +97,7 @@ public class TaskManagerMetricsTest extends TestLogger {
 				taskManagerServices.getMemoryManager(),
 				taskManagerServices.getIOManager(),
 				taskManagerServices.getNetworkEnvironment(),
-				highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
+				highAvailabilityServices,
 				tmRegistry);
 
 			final ActorRef taskManager = actorSystem.actorOf(tmProps);

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index 2a4c036..9dcfc70 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -149,9 +149,6 @@ public class TaskManagerComponentsStartupShutdownTest extends TestLogger {
 
 			network.start();
 
-			LeaderRetrievalService leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(
-				HighAvailabilityServices.DEFAULT_JOB_ID);
-
 			MetricRegistryConfiguration metricRegistryConfiguration = MetricRegistryConfiguration.fromConfiguration(config);
 
 			// create the task manager
@@ -164,7 +161,7 @@ public class TaskManagerComponentsStartupShutdownTest extends TestLogger {
 				ioManager,
 				network,
 				numberOfSlots,
-				leaderRetrievalService,
+				highAvailabilityServices,
 				new MetricRegistry(metricRegistryConfiguration));
 
 			taskManager = actorSystem.actorOf(tmProps);

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
index 92de31a..0844aad 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
@@ -26,6 +26,7 @@ import akka.testkit.JavaTestKit;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
 import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager;
 import org.apache.flink.runtime.concurrent.Executors;
@@ -57,6 +58,7 @@ import scala.concurrent.Future;
 import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
@@ -601,7 +603,7 @@ public class TaskManagerRegistrationTest extends TestLogger {
 	}
 
 	@Test
-	public void testCheckForValidRegistrationSessionIDs() {
+	public void testCheckForValidRegistrationSessionIDs() throws IOException {
 		new JavaTestKit(actorSystem) {{
 
 			ActorGateway taskManagerGateway = null;
@@ -612,6 +614,7 @@ public class TaskManagerRegistrationTest extends TestLogger {
 			HighAvailabilityServices mockedHighAvailabilityServices = mock(HighAvailabilityServices.class);
 			when(mockedHighAvailabilityServices.getJobManagerLeaderRetriever(Matchers.eq(HighAvailabilityServices.DEFAULT_JOB_ID)))
 				.thenReturn(new StandaloneLeaderRetrievalService(getTestActor().path().toString(), trueLeaderSessionID));
+			when(mockedHighAvailabilityServices.createBlobStore()).thenReturn(new VoidBlobStore());
 
 			try {
 				// we make the test actor (the test kit) the JobManager to intercept

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/resources/log4j-test.properties b/flink-runtime/src/test/resources/log4j-test.properties
index 7ba1633..98f136a 100644
--- a/flink-runtime/src/test/resources/log4j-test.properties
+++ b/flink-runtime/src/test/resources/log4j-test.properties
@@ -16,7 +16,7 @@
 # limitations under the License.
 ################################################################################
 
-log4j.rootLogger=OFF, console
+log4j.rootLogger=INFO, console
 
 # -----------------------------------------------------------------------------
 # Console (use 'console')

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
index 4be3299..1b9ee48 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
@@ -249,7 +249,8 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll with Befor
     val components = JobManager.createJobManagerComponents(
       config,
       executor,
-      executor)
+      executor,
+      highAvailabilityServices.createBlobStore())
 
     // Start the JobManager without a MetricRegistry so that we don't start the MetricQueryService.
     // The problem of the MetricQueryService is that it starts an actor with a fixed name. Thus,

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
index 09dc5ed..1db0a85 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
@@ -19,9 +19,9 @@
 package org.apache.flink.runtime.testingUtils
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices
 import org.apache.flink.runtime.io.disk.iomanager.IOManager
 import org.apache.flink.runtime.io.network.NetworkEnvironment
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
 import org.apache.flink.runtime.memory.MemoryManager
 import org.apache.flink.runtime.metrics.MetricRegistry
 import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration
@@ -32,15 +32,15 @@ import scala.language.postfixOps
 /** Subclass of the [[TaskManager]] to support testing messages
  */
 class TestingTaskManager(
-                          config: TaskManagerConfiguration,
-                          resourceID: ResourceID,
-                          connectionInfo: TaskManagerLocation,
-                          memoryManager: MemoryManager,
-                          ioManager: IOManager,
-                          network: NetworkEnvironment,
-                          numberOfSlots: Int,
-                          leaderRetrievalService: LeaderRetrievalService,
-                          metricRegistry : MetricRegistry)
+    config: TaskManagerConfiguration,
+    resourceID: ResourceID,
+    connectionInfo: TaskManagerLocation,
+    memoryManager: MemoryManager,
+    ioManager: IOManager,
+    network: NetworkEnvironment,
+    numberOfSlots: Int,
+    highAvailabilityServices: HighAvailabilityServices,
+    metricRegistry : MetricRegistry)
   extends TaskManager(
     config,
     resourceID,
@@ -49,19 +49,19 @@ class TestingTaskManager(
     ioManager,
     network,
     numberOfSlots,
-    leaderRetrievalService,
+    highAvailabilityServices,
     metricRegistry)
   with TestingTaskManagerLike {
 
   def this(
-            config: TaskManagerConfiguration,
-            connectionInfo: TaskManagerLocation,
-            memoryManager: MemoryManager,
-            ioManager: IOManager,
-            network: NetworkEnvironment,
-            numberOfSlots: Int,
-            leaderRetrievalService: LeaderRetrievalService,
-            metricRegistry : MetricRegistry) {
+    config: TaskManagerConfiguration,
+    connectionInfo: TaskManagerLocation,
+    memoryManager: MemoryManager,
+    ioManager: IOManager,
+    network: NetworkEnvironment,
+    numberOfSlots: Int,
+    highAvailabilityServices: HighAvailabilityServices,
+    metricRegistry : MetricRegistry) {
     this(
       config,
       ResourceID.generate(),
@@ -70,7 +70,7 @@ class TestingTaskManager(
       ioManager,
       network,
       numberOfSlots,
-      leaderRetrievalService,
+      highAvailabilityServices,
       metricRegistry)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
index 5f9d178..2983d66 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
@@ -155,6 +155,7 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
 		Configuration config = new Configuration();
 		config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
 		config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkQuorum);
+		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, FileStateBackendBasePath.getAbsolutePath());
 
 		ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
 				"leader", 1, config);

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
index 0f82faa..1df4b8d 100644
--- a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
+++ b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
@@ -19,6 +19,7 @@
 package org.apache.flink.yarn
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices
 import org.apache.flink.runtime.io.disk.iomanager.IOManager
 import org.apache.flink.runtime.io.network.NetworkEnvironment
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
@@ -40,19 +41,19 @@ import org.apache.flink.runtime.testingUtils.TestingTaskManagerLike
   * @param ioManager IOManager responsible for I/O
   * @param network NetworkEnvironment for this actor
   * @param numberOfSlots Number of slots for this TaskManager
-  * @param leaderRetrievalService [[LeaderRetrievalService]] to retrieve the current leading
-  *                              JobManager
+  * @param highAvailabilityServices [[HighAvailabilityServices]] to create a leader retrieval
+  *                                service for retrieving the leading JobManager
   */
 class TestingYarnTaskManager(
-                              config: TaskManagerConfiguration,
-                              resourceID: ResourceID,
-                              connectionInfo: TaskManagerLocation,
-                              memoryManager: MemoryManager,
-                              ioManager: IOManager,
-                              network: NetworkEnvironment,
-                              numberOfSlots: Int,
-                              leaderRetrievalService: LeaderRetrievalService,
-                              metricRegistry : MetricRegistry)
+    config: TaskManagerConfiguration,
+    resourceID: ResourceID,
+    connectionInfo: TaskManagerLocation,
+    memoryManager: MemoryManager,
+    ioManager: IOManager,
+    network: NetworkEnvironment,
+    numberOfSlots: Int,
+    highAvailabilityServices: HighAvailabilityServices,
+    metricRegistry : MetricRegistry)
   extends YarnTaskManager(
     config,
     resourceID,
@@ -61,7 +62,7 @@ class TestingYarnTaskManager(
     ioManager,
     network,
     numberOfSlots,
-    leaderRetrievalService,
+    highAvailabilityServices,
     metricRegistry)
   with TestingTaskManagerLike {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java
index e9c3904..f81d040 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.blob.BlobStore;
+import org.apache.flink.runtime.blob.BlobStoreService;
 import org.apache.flink.runtime.blob.FileSystemBlobStore;
 import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -91,6 +92,9 @@ public abstract class YarnHighAvailabilityServices implements HighAvailabilitySe
 	 * HA services clean up */
 	protected final Path haDataDirectory;
 
+	/** Blob store service to be used for the BlobServer and BlobCache */
+	protected final BlobStoreService blobStoreService;
+
 	/** Flag marking this instance as shut down */
 	private volatile boolean closed;
 
@@ -153,6 +157,8 @@ public abstract class YarnHighAvailabilityServices implements HighAvailabilitySe
 		}
 
 		LOG.info("Flink YARN application will store recovery data at {}", haDataDirectory);
+
+		blobStoreService = new FileSystemBlobStore(flinkFileSystem, haDataDirectory.toString());
 	}
 
 	// ------------------------------------------------------------------------
@@ -163,7 +169,7 @@ public abstract class YarnHighAvailabilityServices implements HighAvailabilitySe
 	public BlobStore createBlobStore() throws IOException {
 		enter();
 		try {
-			return new FileSystemBlobStore(flinkFileSystem, haDataDirectory.toString());
+			return blobStoreService;
 		} finally {
 			exit();
 		}
@@ -192,11 +198,23 @@ public abstract class YarnHighAvailabilityServices implements HighAvailabilitySe
 			}
 			closed = true;
 
+			Throwable exception = null;
+
+			try {
+				blobStoreService.close();
+			} catch (Throwable t) {
+				exception = t;
+			}
+
 			// we do not propagate exceptions here, but only log them
 			try {
 				hadoopFileSystem.close();
 			} catch (Throwable t) {
-				LOG.warn("Error closing Hadoop FileSystem", t);
+				exception = ExceptionUtils.firstOrSuppressed(t, exception);
+			}
+
+			if (exception != null) {
+				ExceptionUtils.rethrowException(exception, "Could not properly close the YarnHighAvailabilityServices.");
 			}
 		}
 		finally {
@@ -213,12 +231,18 @@ public abstract class YarnHighAvailabilityServices implements HighAvailabilitySe
 			// we remember exceptions only, then continue cleanup, and re-throw at the end
 			Throwable exception = null;
 
+			try {
+				blobStoreService.closeAndCleanupAllData();
+			} catch (Throwable t) {
+				exception = t;
+			}
+
 			// first, we delete all data in Flink's data directory
 			try {
 				flinkFileSystem.delete(haDataDirectory, true);
 			}
 			catch (Throwable t) {
-				exception = t;
+				exception = ExceptionUtils.firstOrSuppressed(t, exception);
 			}
 
 			// now we actually close the services

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
index be31085..b7f4c9a 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
@@ -19,9 +19,9 @@
 package org.apache.flink.yarn
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices
 import org.apache.flink.runtime.io.disk.iomanager.IOManager
 import org.apache.flink.runtime.io.network.NetworkEnvironment
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
 import org.apache.flink.runtime.memory.MemoryManager
 import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerLocation}
 import org.apache.flink.runtime.metrics.MetricRegistry
@@ -38,7 +38,7 @@ class YarnTaskManager(
     ioManager: IOManager,
     network: NetworkEnvironment,
     numberOfSlots: Int,
-    leaderRetrievalService: LeaderRetrievalService,
+    highAvailabilityServices: HighAvailabilityServices,
     metricRegistry : MetricRegistry)
   extends TaskManager(
     config,
@@ -48,7 +48,7 @@ class YarnTaskManager(
     ioManager,
     network,
     numberOfSlots,
-    leaderRetrievalService,
+    highAvailabilityServices,
     metricRegistry) {
 
   override def handleMessage: Receive = {


[5/5] flink git commit: [FLINK-6020] Introduce BlobServer#readWriteLock to synchronize file creation and deletion

Posted by tr...@apache.org.
[FLINK-6020] Introduce BlobServer#readWriteLock to synchronize file creation and deletion

This commit introduces a BlobServer#readWriteLock in order to synchronize file creation
and deletion operations in BlobServerConnection and BlobServer. This will prevent
that multiple put and get operations interfere with each other and with get operations.

The get operations are synchronized using the read lock in order to guarantee some kind of
parallelism.

Add Get and Delete operation tests

This closes #3888.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/60873b0c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/60873b0c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/60873b0c

Branch: refs/heads/release-1.3
Commit: 60873b0c57be7b83d55af179b4be4defc46d80de
Parents: e3ea89a
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed May 10 17:38:49 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed May 17 08:17:02 2017 +0200

----------------------------------------------------------------------
 .../apache/flink/runtime/blob/BlobClient.java   |   2 +-
 .../apache/flink/runtime/blob/BlobServer.java   |  29 ++-
 .../runtime/blob/BlobServerConnection.java      | 240 +++++++++++++++----
 .../runtime/blob/BlobServerDeleteTest.java      |  73 +++++-
 .../flink/runtime/blob/BlobServerGetTest.java   | 115 ++++++++-
 .../flink/runtime/blob/BlobServerPutTest.java   | 109 ++++++++-
 .../src/test/resources/log4j-test.properties    |   2 +-
 7 files changed, 509 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/60873b0c/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
index ea90f54..3da7ef2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
@@ -538,7 +538,7 @@ public final class BlobClient implements Closeable {
 			throw new IOException("Server side error: " + cause.getMessage(), cause);
 		}
 		else {
-			throw new IOException("Unrecognized response");
+			throw new IOException("Unrecognized response: " + response + '.');
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/60873b0c/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index a006981..ba04c41 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -44,6 +44,8 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -85,6 +87,9 @@ public class BlobServer extends Thread implements BlobService {
 	/** The maximum number of concurrent connections */
 	private final int maxConnections;
 
+	/** Lock guarding concurrent file accesses */
+	private final ReadWriteLock readWriteLock;
+
 	/**
 	 * Shutdown hook thread to ensure deletion of the storage directory (or <code>null</code> if
 	 * the configured high availability mode does not equal{@link HighAvailabilityMode#NONE})
@@ -104,6 +109,7 @@ public class BlobServer extends Thread implements BlobService {
 	public BlobServer(Configuration config, BlobStore blobStore) throws IOException {
 		this.blobServiceConfiguration = checkNotNull(config);
 		this.blobStore = checkNotNull(blobStore);
+		this.readWriteLock = new ReentrantReadWriteLock();
 
 		// configure and create the storage directory
 		String storageDirectory = config.getString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, null);
@@ -237,6 +243,13 @@ public class BlobServer extends Thread implements BlobService {
 		return blobStore;
 	}
 
+	/**
+	 * Returns the lock used to guard file accesses
+	 */
+	public ReadWriteLock getReadWriteLock() {
+		return readWriteLock;
+	}
+
 	@Override
 	public void run() {
 		try {
@@ -397,13 +410,19 @@ public class BlobServer extends Thread implements BlobService {
 	public void delete(BlobKey key) throws IOException {
 		final File localFile = BlobUtils.getStorageLocation(storageDir, key);
 
-		if (localFile.exists()) {
-			if (!localFile.delete()) {
-				LOG.warn("Failed to delete locally BLOB " + key + " at " + localFile.getAbsolutePath());
+		readWriteLock.writeLock().lock();
+
+		try {
+			if (localFile.exists()) {
+				if (!localFile.delete()) {
+					LOG.warn("Failed to delete locally BLOB " + key + " at " + localFile.getAbsolutePath());
+				}
 			}
-		}
 
-		blobStore.delete(key);
+			blobStore.delete(key);
+		} finally {
+			readWriteLock.writeLock().unlock();
+		}
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/60873b0c/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
index 13a90c6..a76dbd5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.blob;
 
-import com.google.common.io.Files;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.util.InstantiationUtil;
 import org.slf4j.Logger;
@@ -33,7 +32,11 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.Socket;
 import java.net.SocketException;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
 import java.security.MessageDigest;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
 
 import static org.apache.flink.runtime.blob.BlobServerProtocol.BUFFER_SIZE;
 import static org.apache.flink.runtime.blob.BlobServerProtocol.CONTENT_ADDRESSABLE;
@@ -67,6 +70,12 @@ class BlobServerConnection extends Thread {
 	/** The HA blob store. */
 	private final BlobStore blobStore;
 
+	/** Write lock to synchronize file accesses */
+	private final Lock writeLock;
+
+	/** Read lock to synchronize file accesses */
+	private final Lock readLock;
+
 	/**
 	 * Creates a new BLOB connection for a client request
 	 * 
@@ -74,7 +83,7 @@ class BlobServerConnection extends Thread {
 	 * @param blobServer The BLOB server.
 	 */
 	BlobServerConnection(Socket clientSocket, BlobServer blobServer) {
-		super("BLOB connection for " + clientSocket.getRemoteSocketAddress().toString());
+		super("BLOB connection for " + clientSocket.getRemoteSocketAddress());
 		setDaemon(true);
 
 		if (blobServer == null) {
@@ -84,6 +93,11 @@ class BlobServerConnection extends Thread {
 		this.clientSocket = clientSocket;
 		this.blobServer = blobServer;
 		this.blobStore = blobServer.getBlobStore();
+
+		ReadWriteLock readWriteLock = blobServer.getReadWriteLock();
+
+		this.writeLock = readWriteLock.writeLock();
+		this.readLock = readWriteLock.readLock();
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -178,8 +192,13 @@ class BlobServerConnection extends Thread {
 		 */
 
 		File blobFile;
+		int contentAddressable = -1;
+		JobID jobId = null;
+		String key = null;
+		BlobKey blobKey = null;
+
 		try {
-			final int contentAddressable = inputStream.read();
+			contentAddressable = inputStream.read();
 
 			if (contentAddressable < 0) {
 				throw new EOFException("Premature end of GET request");
@@ -189,37 +208,18 @@ class BlobServerConnection extends Thread {
 				byte[] jidBytes = new byte[JobID.SIZE];
 				readFully(inputStream, jidBytes, 0, JobID.SIZE, "JobID");
 
-				JobID jobID = JobID.fromByteArray(jidBytes);
-				String key = readKey(buf, inputStream);
-				blobFile = this.blobServer.getStorageLocation(jobID, key);
-
-				if (!blobFile.exists()) {
-					blobStore.get(jobID, key, blobFile);
-				}
+				jobId = JobID.fromByteArray(jidBytes);
+				key = readKey(buf, inputStream);
+				blobFile = blobServer.getStorageLocation(jobId, key);
 			}
 			else if (contentAddressable == CONTENT_ADDRESSABLE) {
-				final BlobKey key = BlobKey.readFromInputStream(inputStream);
-				blobFile = blobServer.getStorageLocation(key);
-
-				if (!blobFile.exists()) {
-					blobStore.get(key, blobFile);
-				}
+				blobKey = BlobKey.readFromInputStream(inputStream);
+				blobFile = blobServer.getStorageLocation(blobKey);
 			}
 			else {
-				throw new IOException("Unknown type of BLOB addressing.");
-			}
-
-			// Check if BLOB exists
-			if (!blobFile.exists()) {
-				throw new IOException("Cannot find required BLOB at " + blobFile.getAbsolutePath());
-			}
-
-			if (blobFile.length() > Integer.MAX_VALUE) {
-				throw new IOException("BLOB size exceeds the maximum size (2 GB).");
+				throw new IOException("Unknown type of BLOB addressing: " + contentAddressable + '.');
 			}
 
-			outputStream.write(RETURN_OKAY);
-
 			// up to here, an error can give a good message
 		}
 		catch (Throwable t) {
@@ -235,8 +235,58 @@ class BlobServerConnection extends Thread {
 			return;
 		}
 
-		// from here on, we started sending data, so all we can do is close the connection when something happens
+		readLock.lock();
+
 		try {
+			try {
+				if (!blobFile.exists()) {
+					// first we have to release the read lock in order to acquire the write lock
+					readLock.unlock();
+					writeLock.lock();
+
+					try {
+						if (blobFile.exists()) {
+							LOG.debug("Blob file {} has downloaded from the BlobStore by a different connection.", blobFile);
+						} else {
+							if (contentAddressable == NAME_ADDRESSABLE) {
+								blobStore.get(jobId, key, blobFile);
+							} else if (contentAddressable == CONTENT_ADDRESSABLE) {
+								blobStore.get(blobKey, blobFile);
+							} else {
+								throw new IOException("Unknown type of BLOB addressing: " + contentAddressable + '.');
+							}
+						}
+					} finally {
+						writeLock.unlock();
+					}
+
+					readLock.lock();
+
+					// Check if BLOB exists
+					if (!blobFile.exists()) {
+						throw new IOException("Cannot find required BLOB at " + blobFile.getAbsolutePath());
+					}
+				}
+
+				if (blobFile.length() > Integer.MAX_VALUE) {
+					throw new IOException("BLOB size exceeds the maximum size (2 GB).");
+				}
+
+				outputStream.write(RETURN_OKAY);
+			} catch (Throwable t) {
+				LOG.error("GET operation failed", t);
+				try {
+					writeErrorToStream(outputStream, t);
+				}
+				catch (IOException e) {
+					// since we are in an exception case, it means not much that we could not send the error
+					// ignore this
+				}
+				clientSocket.close();
+				return;
+			}
+
+			// from here on, we started sending data, so all we can do is close the connection when something happens
 			int blobLen = (int) blobFile.length();
 			writeLength(blobLen, outputStream);
 
@@ -251,14 +301,14 @@ class BlobServerConnection extends Thread {
 					bytesRemaining -= read;
 				}
 			}
-		}
-		catch (SocketException e) {
+		} catch (SocketException e) {
 			// happens when the other side disconnects
 			LOG.debug("Socket connection closed", e);
-		}
-		catch (Throwable t) {
+		} catch (Throwable t) {
 			LOG.error("GET operation failed", t);
 			clientSocket.close();
+		} finally {
+			readLock.unlock();
 		}
 	}
 
@@ -328,21 +378,83 @@ class BlobServerConnection extends Thread {
 			fos.close();
 
 			if (contentAddressable == NAME_ADDRESSABLE) {
-				File storageFile = this.blobServer.getStorageLocation(jobID, key);
-				Files.move(incomingFile, storageFile);
-				incomingFile = null;
+				File storageFile = blobServer.getStorageLocation(jobID, key);
 
-				blobStore.put(storageFile, jobID, key);
+				writeLock.lock();
+
+				try {
+					// first check whether the file already exists
+					if (!storageFile.exists()) {
+						try {
+							// only move the file if it does not yet exist
+							Files.move(incomingFile.toPath(), storageFile.toPath());
+
+							incomingFile = null;
+
+						} catch (FileAlreadyExistsException ignored) {
+							LOG.warn("Detected concurrent file modifications. This should only happen if multiple" +
+								"BlobServer use the same storage directory.");
+							// we cannot be sure at this point whether the file has already been uploaded to the blob
+							// store or not. Even if the blobStore might shortly be in an inconsistent state, we have
+							// persist the blob. Otherwise we might not be able to recover the job.
+						}
+
+						// only the one moving the incoming file to its final destination is allowed to upload the
+						// file to the blob store
+						blobStore.put(storageFile, jobID, key);
+					}
+				} catch(IOException ioe) {
+					// we failed to either create the local storage file or to upload it --> try to delete the local file
+					// while still having the write lock
+					if (storageFile.exists() && !storageFile.delete()) {
+						LOG.warn("Could not delete the storage file.");
+					}
+
+					throw ioe;
+				} finally {
+					writeLock.unlock();
+				}
 
 				outputStream.write(RETURN_OKAY);
 			}
 			else {
 				BlobKey blobKey = new BlobKey(md.digest());
 				File storageFile = blobServer.getStorageLocation(blobKey);
-				Files.move(incomingFile, storageFile);
-				incomingFile = null;
 
-				blobStore.put(storageFile, blobKey);
+				writeLock.lock();
+
+				try {
+					// first check whether the file already exists
+					if (!storageFile.exists()) {
+						try {
+							// only move the file if it does not yet exist
+							Files.move(incomingFile.toPath(), storageFile.toPath());
+
+							incomingFile = null;
+
+						} catch (FileAlreadyExistsException ignored) {
+							LOG.warn("Detected concurrent file modifications. This should only happen if multiple" +
+								"BlobServer use the same storage directory.");
+							// we cannot be sure at this point whether the file has already been uploaded to the blob
+							// store or not. Even if the blobStore might shortly be in an inconsistent state, we have
+							// persist the blob. Otherwise we might not be able to recover the job.
+						}
+
+						// only the one moving the incoming file to its final destination is allowed to upload the
+						// file to the blob store
+						blobStore.put(storageFile, blobKey);
+					}
+				} catch(IOException ioe) {
+					// we failed to either create the local storage file or to upload it --> try to delete the local file
+					// while still having the write lock
+					if (storageFile.exists() && !storageFile.delete()) {
+						LOG.warn("Could not delete the storage file.");
+					}
+
+					throw ioe;
+				} finally {
+					writeLock.unlock();
+				}
 
 				// Return computed key to client for validation
 				outputStream.write(RETURN_OKAY);
@@ -397,12 +509,21 @@ class BlobServerConnection extends Thread {
 
 			if (type == CONTENT_ADDRESSABLE) {
 				BlobKey key = BlobKey.readFromInputStream(inputStream);
-				File blobFile = this.blobServer.getStorageLocation(key);
-				if (blobFile.exists() && !blobFile.delete()) {
-					throw new IOException("Cannot delete BLOB file " + blobFile.getAbsolutePath());
-				}
+				File blobFile = blobServer.getStorageLocation(key);
+
+				writeLock.lock();
 
-				blobStore.delete(key);
+				try {
+					// we should make the local and remote file deletion atomic, otherwise we might risk not
+					// removing the remote file in case of a concurrent put operation
+					if (blobFile.exists() && !blobFile.delete()) {
+						throw new IOException("Cannot delete BLOB file " + blobFile.getAbsolutePath());
+					}
+
+					blobStore.delete(key);
+				} finally {
+					writeLock.unlock();
+				}
 			}
 			else if (type == NAME_ADDRESSABLE) {
 				byte[] jidBytes = new byte[JobID.SIZE];
@@ -412,20 +533,37 @@ class BlobServerConnection extends Thread {
 				String key = readKey(buf, inputStream);
 
 				File blobFile = this.blobServer.getStorageLocation(jobID, key);
-				if (blobFile.exists() && !blobFile.delete()) {
-					throw new IOException("Cannot delete BLOB file " + blobFile.getAbsolutePath());
-				}
 
-				blobStore.delete(jobID, key);
+				writeLock.lock();
+
+				try {
+					// we should make the local and remote file deletion atomic, otherwise we might risk not
+					// removing the remote file in case of a concurrent put operation
+					if (blobFile.exists() && !blobFile.delete()) {
+						throw new IOException("Cannot delete BLOB file " + blobFile.getAbsolutePath());
+					}
+
+					blobStore.delete(jobID, key);
+				} finally {
+					writeLock.unlock();
+				}
 			}
 			else if (type == JOB_ID_SCOPE) {
 				byte[] jidBytes = new byte[JobID.SIZE];
 				readFully(inputStream, jidBytes, 0, JobID.SIZE, "JobID");
 				JobID jobID = JobID.fromByteArray(jidBytes);
 
-				blobServer.deleteJobDirectory(jobID);
+				writeLock.lock();
+
+				try {
+					// we should make the local and remote file deletion atomic, otherwise we might risk not
+					// removing the remote file in case of a concurrent put operation
+					blobServer.deleteJobDirectory(jobID);
 
-				blobStore.deleteAll(jobID);
+					blobStore.deleteAll(jobID);
+				} finally {
+					writeLock.unlock();
+				}
 			}
 			else {
 				throw new IOException("Unrecognized addressing type: " + type);

http://git-wip-us.apache.org/repos/asf/flink/blob/60873b0c/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
index e8e28a1..5e1d86e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
@@ -20,23 +20,35 @@ package org.apache.flink.runtime.blob;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.junit.Assume.assumeTrue;
+import static org.mockito.Mockito.mock;
 
 /**
  * Tests how DELETE requests behave.
  */
-public class BlobServerDeleteTest {
+public class BlobServerDeleteTest extends TestLogger {
 
 	private final Random rnd = new Random();
 
@@ -285,6 +297,65 @@ public class BlobServerDeleteTest {
 		}
 	}
 
+	/**
+	 * FLINK-6020
+	 *
+	 * Tests that concurrent delete operations don't interfere with each other.
+	 *
+	 * Note: The test checks that there cannot be two threads which have checked whether a given blob file exist
+	 * and then one of them fails deleting it. Without the introduced lock, this situation should rarely happen
+	 * and make this test fail. Thus, if this test should become "unstable", then the delete atomicity is most likely
+	 * broken.
+	 */
+	@Test
+	public void testConcurrentDeleteOperations() throws IOException, ExecutionException, InterruptedException {
+		final Configuration configuration = new Configuration();
+		final BlobStore blobStore = mock(BlobStore.class);
+
+		final int concurrentDeleteOperations = 3;
+		final ExecutorService executor = Executors.newFixedThreadPool(concurrentDeleteOperations);
+
+		final List<Future<Void>> deleteFutures = new ArrayList<>(concurrentDeleteOperations);
+
+		final byte[] data = {1, 2, 3};
+
+		try (final BlobServer blobServer = new BlobServer(configuration, blobStore)) {
+
+			final BlobKey blobKey;
+
+			try (BlobClient client = blobServer.createClient()) {
+				blobKey = client.put(data);
+			}
+
+			assertTrue(blobServer.getStorageLocation(blobKey).exists());
+
+			for (int i = 0; i < concurrentDeleteOperations; i++) {
+				Future<Void> deleteFuture = FlinkCompletableFuture.supplyAsync(new Callable<Void>() {
+					@Override
+					public Void call() throws Exception {
+						try (BlobClient blobClient = blobServer.createClient()) {
+							blobClient.delete(blobKey);
+						}
+
+						return null;
+					}
+				}, executor);
+
+				deleteFutures.add(deleteFuture);
+			}
+
+			Future<Void> waitFuture = FutureUtils.waitForAll(deleteFutures);
+
+			// make sure all delete operation have completed successfully
+			// in case of no lock, one of the delete operations should eventually fail
+			waitFuture.get();
+
+			assertFalse(blobServer.getStorageLocation(blobKey).exists());
+		} finally {
+			executor.shutdownNow();
+		}
+	}
+
 	private void cleanup(BlobServer server, BlobClient client) {
 		if (client != null) {
 			try {

http://git-wip-us.apache.org/repos/asf/flink/blob/60873b0c/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
index 6d1dba8..a3de02f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
@@ -18,27 +18,57 @@
 
 package org.apache.flink.runtime.blob;
 
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.util.TestLogger;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
 
 import java.io.InputStream;
 import java.net.InetSocketAddress;
+import java.security.MessageDigest;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
 import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import static org.junit.Assert.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 /**
  * Tests how failing GET requests behave in the presence of failures.
  * Successful GET requests are tested in conjunction wit the PUT
  * requests.
  */
-public class BlobServerGetTest {
+public class BlobServerGetTest extends TestLogger {
 
 	private final Random rnd = new Random();
 
+	@Rule
+	public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
 	@Test
 	public void testGetFailsDuringLookup() throws IOException {
 		BlobServer server = null;
@@ -128,4 +158,87 @@ public class BlobServerGetTest {
 			}
 		}
 	}
+
+	/**
+	 * FLINK-6020
+	 *
+	 * Tests that concurrent get operations don't concurrently access the BlobStore to download a blob.
+	 */
+	@Test
+	public void testConcurrentGetOperations() throws IOException, ExecutionException, InterruptedException {
+		final Configuration configuration = new Configuration();
+
+		configuration.setString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, temporaryFolder.newFolder().getAbsolutePath());
+
+		final BlobStore blobStore = mock(BlobStore.class);
+
+		final int numberConcurrentGetOperations = 3;
+		final List<Future<InputStream>> getOperations = new ArrayList<>(numberConcurrentGetOperations);
+
+		final byte[] data = {1, 2, 3, 4, 99, 42};
+		final ByteArrayInputStream bais = new ByteArrayInputStream(data);
+
+		MessageDigest md = BlobUtils.createMessageDigest();
+
+		// create the correct blob key by hashing our input data
+		final BlobKey blobKey = new BlobKey(md.digest(data));
+
+		doAnswer(
+			new Answer() {
+				@Override
+				public Object answer(InvocationOnMock invocation) throws Throwable {
+					File targetFile = (File) invocation.getArguments()[1];
+
+					FileUtils.copyInputStreamToFile(bais, targetFile);
+
+					return null;
+				}
+			}
+		).when(blobStore).get(any(BlobKey.class), any(File.class));
+
+		final ExecutorService executor = Executors.newFixedThreadPool(numberConcurrentGetOperations);
+
+		try (final BlobServer blobServer = new BlobServer(configuration, blobStore)) {
+			for (int i = 0; i < numberConcurrentGetOperations; i++) {
+				Future<InputStream> getOperation = FlinkCompletableFuture.supplyAsync(new Callable<InputStream>() {
+					@Override
+					public InputStream call() throws Exception {
+						try (BlobClient blobClient = blobServer.createClient();
+							 InputStream inputStream = blobClient.get(blobKey)) {
+							byte[] buffer = new byte[data.length];
+
+							IOUtils.readFully(inputStream, buffer);
+
+							return new ByteArrayInputStream(buffer);
+						}
+					}
+				}, executor);
+
+				getOperations.add(getOperation);
+			}
+
+			Future<Collection<InputStream>> inputStreamsFuture = FutureUtils.combineAll(getOperations);
+
+			Collection<InputStream> inputStreams = inputStreamsFuture.get();
+
+			// check that we have read the right data
+			for (InputStream inputStream : inputStreams) {
+				ByteArrayOutputStream baos = new ByteArrayOutputStream(data.length);
+
+				IOUtils.copy(inputStream, baos);
+
+				baos.close();
+				byte[] input = baos.toByteArray();
+
+				assertArrayEquals(data, input);
+
+				inputStream.close();
+			}
+
+			// verify that we downloaded the requested blob exactly once from the BlobStore
+			verify(blobStore, times(1)).get(eq(blobKey), any(File.class));
+		} finally {
+			executor.shutdownNow();
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/60873b0c/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
index 441ca7d..35ef968 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
@@ -20,7 +20,12 @@ package org.apache.flink.runtime.blob;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
 import java.io.ByteArrayInputStream;
@@ -28,16 +33,29 @@ import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
 import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import static org.junit.Assert.*;
 import static org.junit.Assume.assumeTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 /**
  * Tests for successful and failing PUT operations against the BLOB server,
  * and successful GET operations.
  */
-public class BlobServerPutTest {
+public class BlobServerPutTest extends TestLogger {
 
 	private final Random rnd = new Random();
 
@@ -299,6 +317,95 @@ public class BlobServerPutTest {
 		}
 	}
 
+	/**
+	 * FLINK-6020
+	 *
+	 * Tests that concurrent put operations will only upload the file once to the {@link BlobStore}.
+	 */
+	@Test
+	public void testConcurrentPutOperations() throws IOException, ExecutionException, InterruptedException {
+		final Configuration configuration = new Configuration();
+		BlobStore blobStore = mock(BlobStore.class);
+		int concurrentPutOperations = 2;
+		int dataSize = 1024;
+
+		final CountDownLatch countDownLatch = new CountDownLatch(concurrentPutOperations);
+		final byte[] data = new byte[dataSize];
+
+		ArrayList<Future<BlobKey>> allFutures = new ArrayList(concurrentPutOperations);
+
+		ExecutorService executor = Executors.newFixedThreadPool(concurrentPutOperations);
+
+		try (
+			final BlobServer blobServer = new BlobServer(configuration, blobStore)) {
+
+			for (int i = 0; i < concurrentPutOperations; i++) {
+				Future<BlobKey> putFuture = FlinkCompletableFuture.supplyAsync(new Callable<BlobKey>() {
+					@Override
+					public BlobKey call() throws Exception {
+						try (BlobClient blobClient = blobServer.createClient()) {
+							return blobClient.put(new BlockingInputStream(countDownLatch, data));
+						}
+					}
+				}, executor);
+
+				allFutures.add(putFuture);
+			}
+
+			FutureUtils.ConjunctFuture<Collection<BlobKey>> conjunctFuture = FutureUtils.combineAll(allFutures);
+
+			// wait until all operations have completed and check that no exception was thrown
+			Collection<BlobKey> blobKeys = conjunctFuture.get();
+
+			Iterator<BlobKey> blobKeyIterator = blobKeys.iterator();
+
+			assertTrue(blobKeyIterator.hasNext());
+
+			BlobKey blobKey = blobKeyIterator.next();
+
+			// make sure that all blob keys are the same
+			while(blobKeyIterator.hasNext()) {
+				assertEquals(blobKey, blobKeyIterator.next());
+			}
+
+			// check that we only uploaded the file once to the blob store
+			verify(blobStore, times(1)).put(any(File.class), eq(blobKey));
+		} finally {
+			executor.shutdownNow();
+		}
+	}
+
+	private static final class BlockingInputStream extends InputStream {
+
+		private final CountDownLatch countDownLatch;
+		private final byte[] data;
+		private int index = 0;
+
+		public BlockingInputStream(CountDownLatch countDownLatch, byte[] data) {
+			this.countDownLatch = Preconditions.checkNotNull(countDownLatch);
+			this.data = Preconditions.checkNotNull(data);
+		}
+
+		@Override
+		public int read() throws IOException {
+
+			countDownLatch.countDown();
+
+			try {
+				countDownLatch.await();
+			} catch (InterruptedException e) {
+				Thread.currentThread().interrupt();
+				throw new IOException("Blocking operation was interrupted.", e);
+			}
+
+			if (index >= data.length) {
+				return -1;
+			} else {
+				return data[index++];
+			}
+		}
+	}
+
 	// --------------------------------------------------------------------------------------------
 
 	private static final class ChunkedInputStream extends InputStream {

http://git-wip-us.apache.org/repos/asf/flink/blob/60873b0c/flink-runtime/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/resources/log4j-test.properties b/flink-runtime/src/test/resources/log4j-test.properties
index 98f136a..7ba1633 100644
--- a/flink-runtime/src/test/resources/log4j-test.properties
+++ b/flink-runtime/src/test/resources/log4j-test.properties
@@ -16,7 +16,7 @@
 # limitations under the License.
 ################################################################################
 
-log4j.rootLogger=INFO, console
+log4j.rootLogger=OFF, console
 
 # -----------------------------------------------------------------------------
 # Console (use 'console')