You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/08/18 07:36:48 UTC

[3/5] flink git commit: [FLINK-7057][blob] move ref-counting from the LibraryCacheManager to the BlobCache

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index 23f0a38..933c7a0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.jobmanager;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
@@ -27,7 +28,6 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.blob.BlobServer;
-import org.apache.flink.runtime.blob.BlobService;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
@@ -90,6 +90,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
+
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -158,6 +159,7 @@ public class JobManagerHARecoveryTest extends TestLogger {
 		flinkConfiguration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
 		flinkConfiguration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().toString());
 		flinkConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, slots);
+		flinkConfiguration.setLong(BlobServerOptions.CLEANUP_INTERVAL, 3_600L);
 
 		try {
 			Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
@@ -179,6 +181,9 @@ public class JobManagerHARecoveryTest extends TestLogger {
 
 			archive = system.actorOf(JobManager.getArchiveProps(MemoryArchivist.class, 10, Option.<Path>empty()));
 
+			BlobServer blobServer = new BlobServer(
+				flinkConfiguration,
+				testingHighAvailabilityServices.createBlobStore());
 			Props jobManagerProps = Props.create(
 				TestingJobManager.class,
 				flinkConfiguration,
@@ -186,11 +191,8 @@ public class JobManagerHARecoveryTest extends TestLogger {
 				TestingUtils.defaultExecutor(),
 				instanceManager,
 				scheduler,
-				new BlobLibraryCacheManager(
-					new BlobServer(
-						flinkConfiguration,
-						testingHighAvailabilityServices.createBlobStore()),
-					3600000L),
+				blobServer,
+				new BlobLibraryCacheManager(blobServer),
 				archive,
 				new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(Int.MaxValue(), 100),
 				timeout,
@@ -353,6 +355,7 @@ public class JobManagerHARecoveryTest extends TestLogger {
 
 			final Collection<JobID> recoveredJobs = new ArrayList<>(2);
 
+			BlobServer blobServer = mock(BlobServer.class);
 			Props jobManagerProps = Props.create(
 				TestingFailingHAJobManager.class,
 				flinkConfiguration,
@@ -360,7 +363,8 @@ public class JobManagerHARecoveryTest extends TestLogger {
 				TestingUtils.defaultExecutor(),
 				mock(InstanceManager.class),
 				mock(Scheduler.class),
-				new BlobLibraryCacheManager(mock(BlobService.class), 1 << 20),
+				blobServer,
+				new BlobLibraryCacheManager(blobServer),
 				ActorRef.noSender(),
 				new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(Int.MaxValue(), 100),
 				timeout,
@@ -397,6 +401,7 @@ public class JobManagerHARecoveryTest extends TestLogger {
 			Executor ioExecutor,
 			InstanceManager instanceManager,
 			Scheduler scheduler,
+			BlobServer blobServer,
 			BlobLibraryCacheManager libraryCacheManager,
 			ActorRef archive,
 			RestartStrategyFactory restartStrategyFactory,
@@ -413,6 +418,7 @@ public class JobManagerHARecoveryTest extends TestLogger {
 				ioExecutor,
 				instanceManager,
 				scheduler,
+				blobServer,
 				libraryCacheManager,
 				archive,
 				restartStrategyFactory,

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
index 3c75971..6a39293 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
@@ -137,14 +137,13 @@ public class JobSubmitTest {
 			// upload two dummy bytes and add their keys to the job graph as dependencies
 			BlobKey key1, key2;
 			BlobClient bc = new BlobClient(new InetSocketAddress("localhost", blobPort), jmConfig);
-			// TODO: make use of job-related BLOBs after adapting the BlobLibraryCacheManager
-			JobID jobId = null;
+			JobID jobId = jg.getJobID();
 			try {
 				key1 = bc.put(jobId, new byte[10]);
 				key2 = bc.put(jobId, new byte[10]);
 
 				// delete one of the blobs to make sure that the startup failed
-				bc.delete(key2);
+				bc.delete(jobId, key2);
 			}
 			finally {
 				bc.close();

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index df35369..2c17b5a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmaster;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
@@ -92,8 +93,8 @@ public class JobMasterTest extends TestLogger {
 
 		final ScheduledExecutor scheduledExecutor = mock(ScheduledExecutor.class);
 		final HeartbeatServices heartbeatServices = new TestingHeartbeatServices(heartbeatInterval, heartbeatTimeout, scheduledExecutor);
-		final BlobLibraryCacheManager libraryCacheManager = mock(BlobLibraryCacheManager.class);
-		when(libraryCacheManager.getBlobServerPort()).thenReturn(1337);
+		BlobServer blobServer = mock(BlobServer.class);
+		when(blobServer.getPort()).thenReturn(1337);
 
 		final JobGraph jobGraph = new JobGraph();
 
@@ -106,7 +107,8 @@ public class JobMasterTest extends TestLogger {
 				haServices,
 				heartbeatServices,
 				Executors.newScheduledThreadPool(1),
-				libraryCacheManager,
+				blobServer,
+				mock(BlobLibraryCacheManager.class),
 				mock(RestartStrategyFactory.class),
 				Time.of(10, TimeUnit.SECONDS),
 				null,
@@ -204,6 +206,7 @@ public class JobMasterTest extends TestLogger {
 				haServices,
 				heartbeatServices,
 				Executors.newScheduledThreadPool(1),
+				mock(BlobServer.class),
 				mock(BlobLibraryCacheManager.class),
 				mock(RestartStrategyFactory.class),
 				Time.of(10, TimeUnit.SECONDS),

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
index 70800e5..230ca91 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
@@ -25,9 +25,10 @@ import akka.actor.Props;
 import akka.pattern.Patterns;
 import akka.testkit.JavaTestKit;
 import akka.util.Timeout;
-
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.test.TestingServer;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobServer;
@@ -47,13 +48,11 @@ import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.util.TestLogger;
-
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
-
 import scala.Option;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
@@ -178,6 +177,9 @@ public class JobManagerLeaderElectionTest extends TestLogger {
 		SubmittedJobGraphStore submittedJobGraphStore = new StandaloneSubmittedJobGraphStore();
 		CheckpointRecoveryFactory checkpointRecoveryFactory = new StandaloneCheckpointRecoveryFactory();
 
+		configuration.setLong(BlobServerOptions.CLEANUP_INTERVAL, 1L);
+
+		BlobServer blobServer = new BlobServer(configuration, new VoidBlobStore());
 		return Props.create(
 			TestingJobManager.class,
 			configuration,
@@ -185,7 +187,8 @@ public class JobManagerLeaderElectionTest extends TestLogger {
 			TestingUtils.defaultExecutor(),
 			new InstanceManager(),
 			new Scheduler(TestingUtils.defaultExecutionContext()),
-			new BlobLibraryCacheManager(new BlobServer(configuration, new VoidBlobStore()), 10L),
+			blobServer,
+			new BlobLibraryCacheManager(blobServer),
 			ActorRef.noSender(),
 			new NoRestartStrategy.NoRestartStrategyFactory(),
 			AkkaUtils.getDefaultTimeoutAsFiniteDuration(),

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 43ff60b..6842bee 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
@@ -668,7 +669,7 @@ public class TaskExecutorTest extends TestLogger {
 				Collections.<InputGateDeploymentDescriptor>emptyList());
 
 		final LibraryCacheManager libraryCacheManager = mock(LibraryCacheManager.class);
-		when(libraryCacheManager.getClassLoader(eq(jobId))).thenReturn(getClass().getClassLoader());
+		when(libraryCacheManager.getClassLoader(any(JobID.class))).thenReturn(ClassLoader.getSystemClassLoader());
 
 		final JobManagerConnection jobManagerConnection = new JobManagerConnection(
 			jobId,
@@ -677,6 +678,7 @@ public class TaskExecutorTest extends TestLogger {
 			jobManagerLeaderId,
 			mock(TaskManagerActions.class),
 			mock(CheckpointResponder.class),
+			mock(BlobCache.class),
 			libraryCacheManager,
 			mock(ResultPartitionConsumableNotifier.class),
 			mock(PartitionProducerStateChecker.class));
@@ -1191,6 +1193,7 @@ public class TaskExecutorTest extends TestLogger {
 			jobManagerLeaderId,
 			mock(TaskManagerActions.class),
 			mock(CheckpointResponder.class),
+			mock(BlobCache.class),
 			libraryCacheManager,
 			mock(ResultPartitionConsumableNotifier.class),
 			mock(PartitionProducerStateChecker.class));

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 085a386..392dc29 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
@@ -145,6 +146,7 @@ public class TaskAsyncCallTest {
 	}
 	
 	private static Task createTask() throws Exception {
+		BlobCache blobCache = mock(BlobCache.class);
 		LibraryCacheManager libCache = mock(LibraryCacheManager.class);
 		when(libCache.getClassLoader(any(JobID.class))).thenReturn(ClassLoader.getSystemClassLoader());
 		
@@ -195,6 +197,7 @@ public class TaskAsyncCallTest {
 			mock(TaskManagerActions.class),
 			mock(InputSplitProvider.class),
 			mock(CheckpointResponder.class),
+			blobCache,
 			libCache,
 			mock(FileCache.class),
 			new TestingTaskManagerRuntimeInfo(),

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
index 1ebd4ad..ac0df36 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.taskmanager;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
@@ -98,6 +99,7 @@ public class TaskStopTest {
 			mock(TaskManagerActions.class),
 			mock(InputSplitProvider.class),
 			mock(CheckpointResponder.class),
+			mock(BlobCache.class),
 			mock(LibraryCacheManager.class),
 			mock(FileCache.class),
 			tmRuntimeInfo,

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index ba3e820..d4cd0cf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
@@ -227,7 +228,8 @@ public class TaskTest extends TestLogger {
 	@Test
 	public void testLibraryCacheRegistrationFailed() {
 		try {
-			Task task = createTask(TestInvokableCorrect.class, mock(LibraryCacheManager.class));
+			Task task = createTask(TestInvokableCorrect.class, mock(BlobCache.class),
+				mock(LibraryCacheManager.class));
 
 			// task should be new and perfect
 			assertEquals(ExecutionState.CREATED, task.getExecutionState());
@@ -260,6 +262,7 @@ public class TaskTest extends TestLogger {
 	@Test
 	public void testExecutionFailsInNetworkRegistration() {
 		try {
+			BlobCache blobCache = mock(BlobCache.class);
 			// mock a working library cache
 			LibraryCacheManager libCache = mock(LibraryCacheManager.class);
 			when(libCache.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader());
@@ -274,7 +277,7 @@ public class TaskTest extends TestLogger {
 			when(network.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC);
 			doThrow(new RuntimeException("buffers")).when(network).registerTask(any(Task.class));
 
-			Task task = createTask(TestInvokableCorrect.class, libCache, network, consumableNotifier, partitionProducerStateChecker, executor);
+			Task task = createTask(TestInvokableCorrect.class, blobCache, libCache, network, consumableNotifier, partitionProducerStateChecker, executor);
 
 			task.registerExecutionListener(listener);
 
@@ -617,6 +620,7 @@ public class TaskTest extends TestLogger {
 		IntermediateDataSetID resultId = new IntermediateDataSetID();
 		ResultPartitionID partitionId = new ResultPartitionID();
 
+		BlobCache blobCache = mock(BlobCache.class);
 		LibraryCacheManager libCache = mock(LibraryCacheManager.class);
 		when(libCache.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader());
 
@@ -629,7 +633,7 @@ public class TaskTest extends TestLogger {
 		when(network.createKvStateTaskRegistry(any(JobID.class), any(JobVertexID.class)))
 			.thenReturn(mock(TaskKvStateRegistry.class));
 
-		createTask(InvokableBlockingInInvoke.class, libCache, network, consumableNotifier, partitionChecker, Executors.directExecutor());
+		createTask(InvokableBlockingInInvoke.class, blobCache, libCache, network, consumableNotifier, partitionChecker, Executors.directExecutor());
 
 		// Test all branches of trigger partition state check
 
@@ -638,7 +642,7 @@ public class TaskTest extends TestLogger {
 			createQueuesAndActors();
 
 			// PartitionProducerDisposedException
-			Task task = createTask(InvokableBlockingInInvoke.class, libCache, network, consumableNotifier, partitionChecker, Executors.directExecutor());
+			Task task = createTask(InvokableBlockingInInvoke.class, blobCache, libCache, network, consumableNotifier, partitionChecker, Executors.directExecutor());
 
 			CompletableFuture<ExecutionState> promise = new CompletableFuture<>();
 			when(partitionChecker.requestPartitionProducerState(eq(task.getJobID()), eq(resultId), eq(partitionId))).thenReturn(promise);
@@ -654,7 +658,7 @@ public class TaskTest extends TestLogger {
 			createQueuesAndActors();
 
 			// Any other exception
-			Task task = createTask(InvokableBlockingInInvoke.class, libCache, network, consumableNotifier, partitionChecker, Executors.directExecutor());
+			Task task = createTask(InvokableBlockingInInvoke.class, blobCache, libCache, network, consumableNotifier, partitionChecker, Executors.directExecutor());
 
 			CompletableFuture<ExecutionState> promise = new CompletableFuture<>();
 			when(partitionChecker.requestPartitionProducerState(eq(task.getJobID()), eq(resultId), eq(partitionId))).thenReturn(promise);
@@ -671,7 +675,7 @@ public class TaskTest extends TestLogger {
 			createQueuesAndActors();
 
 			// TimeoutException handled special => retry
-			Task task = createTask(InvokableBlockingInInvoke.class, libCache, network, consumableNotifier, partitionChecker, Executors.directExecutor());
+			Task task = createTask(InvokableBlockingInInvoke.class, blobCache, libCache, network, consumableNotifier, partitionChecker, Executors.directExecutor());
 			SingleInputGate inputGate = mock(SingleInputGate.class);
 			when(inputGate.getConsumedResultId()).thenReturn(resultId);
 
@@ -702,7 +706,7 @@ public class TaskTest extends TestLogger {
 			createQueuesAndActors();
 
 			// Success
-			Task task = createTask(InvokableBlockingInInvoke.class, libCache, network, consumableNotifier, partitionChecker, Executors.directExecutor());
+			Task task = createTask(InvokableBlockingInInvoke.class, blobCache, libCache, network, consumableNotifier, partitionChecker, Executors.directExecutor());
 			SingleInputGate inputGate = mock(SingleInputGate.class);
 			when(inputGate.getConsumedResultId()).thenReturn(resultId);
 
@@ -882,26 +886,30 @@ public class TaskTest extends TestLogger {
 	}
 
 	private Task createTask(Class<? extends AbstractInvokable> invokable, Configuration config) throws IOException {
+		BlobCache blobCache = mock(BlobCache.class);
 		LibraryCacheManager libCache = mock(LibraryCacheManager.class);
 		when(libCache.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader());
-		return createTask(invokable, libCache, config, new ExecutionConfig());
+		return createTask(invokable, blobCache,libCache, config, new ExecutionConfig());
 	}
 
 	private Task createTask(Class<? extends AbstractInvokable> invokable, Configuration config, ExecutionConfig execConfig) throws IOException {
+		BlobCache blobCache = mock(BlobCache.class);
 		LibraryCacheManager libCache = mock(LibraryCacheManager.class);
 		when(libCache.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader());
-		return createTask(invokable, libCache, config, execConfig);
+		return createTask(invokable, blobCache,libCache, config, execConfig);
 	}
 
 	private Task createTask(
 			Class<? extends AbstractInvokable> invokable,
+			BlobCache blobCache,
 			LibraryCacheManager libCache) throws IOException {
 
-		return createTask(invokable, libCache, new Configuration(), new ExecutionConfig());
+		return createTask(invokable, blobCache,libCache, new Configuration(), new ExecutionConfig());
 	}
 
 	private Task createTask(
 			Class<? extends AbstractInvokable> invokable,
+			BlobCache blobCache,
 			LibraryCacheManager libCache,
 			Configuration config,
 			ExecutionConfig execConfig) throws IOException {
@@ -916,21 +924,23 @@ public class TaskTest extends TestLogger {
 		when(network.createKvStateTaskRegistry(any(JobID.class), any(JobVertexID.class)))
 				.thenReturn(mock(TaskKvStateRegistry.class));
 
-		return createTask(invokable, libCache, network, consumableNotifier, partitionProducerStateChecker, executor, config, execConfig);
+		return createTask(invokable, blobCache, libCache, network, consumableNotifier, partitionProducerStateChecker, executor, config, execConfig);
 	}
 
 	private Task createTask(
 			Class<? extends AbstractInvokable> invokable,
+			BlobCache blobCache,
 			LibraryCacheManager libCache,
 			NetworkEnvironment networkEnvironment,
 			ResultPartitionConsumableNotifier consumableNotifier,
 			PartitionProducerStateChecker partitionProducerStateChecker,
 			Executor executor) throws IOException {
-		return createTask(invokable, libCache, networkEnvironment, consumableNotifier, partitionProducerStateChecker, executor, new Configuration(), new ExecutionConfig());
+		return createTask(invokable, blobCache, libCache, networkEnvironment, consumableNotifier, partitionProducerStateChecker, executor, new Configuration(), new ExecutionConfig());
 	}
 	
 	private Task createTask(
 		Class<? extends AbstractInvokable> invokable,
+		BlobCache blobCache,
 		LibraryCacheManager libCache,
 		NetworkEnvironment networkEnvironment,
 		ResultPartitionConsumableNotifier consumableNotifier,
@@ -991,6 +1001,7 @@ public class TaskTest extends TestLogger {
 			taskManagerConnection,
 			inputSplitProvider,
 			checkpointResponder,
+			blobCache,
 			libCache,
 			mock(FileCache.class),
 			new TestingTaskManagerRuntimeInfo(taskManagerConfig),

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/FailingBlockingInvokable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/FailingBlockingInvokable.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/FailingBlockingInvokable.java
new file mode 100644
index 0000000..37c141d
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/FailingBlockingInvokable.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testtasks;
+
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+
+/**
+ * Task which blocks until the (static) {@link #unblock()} method is called and then fails with an
+ * exception.
+ */
+public class FailingBlockingInvokable extends AbstractInvokable {
+	private static volatile boolean blocking = true;
+	private static final Object lock = new Object();
+
+	@Override
+	public void invoke() throws Exception {
+		while (blocking) {
+			synchronized (lock) {
+				lock.wait();
+			}
+		}
+		throw new RuntimeException("This exception is expected.");
+	}
+
+	public static void unblock() {
+		blocking = false;
+
+		synchronized (lock) {
+			lock.notifyAll();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
index c1df5a3..229f1eb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
@@ -178,6 +179,7 @@ public class JvmExitOnFatalErrorTest {
 						new NoOpTaskManagerActions(),
 						new NoOpInputSplitProvider(),
 						new NoOpCheckpointResponder(),
+						mock(BlobCache.class),
 						new FallbackLibraryCacheManager(),
 						new FileCache(tmInfo.getTmpDirectories()),
 						tmInfo,

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
index 1b9ee48..95da981 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
@@ -264,14 +264,15 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll with Befor
       components._1,
       components._2,
       components._3,
-      ActorRef.noSender,
       components._4,
+      ActorRef.noSender,
       components._5,
+      components._6,
       highAvailabilityServices.getJobManagerLeaderElectionService(
         HighAvailabilityServices.DEFAULT_JOB_ID),
       highAvailabilityServices.getSubmittedJobGraphStore(),
       highAvailabilityServices.getCheckpointRecoveryFactory(),
-      components._8,
+      components._9,
       None)
 
     _system.actorOf(props)

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index e5655bb..87f8088 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -28,6 +28,7 @@ import akka.testkit.CallingThreadDispatcher
 import org.apache.flink.api.common.JobID
 import org.apache.flink.configuration.{Configuration, JobManagerOptions}
 import org.apache.flink.runtime.akka.AkkaUtils
+import org.apache.flink.runtime.blob.BlobServer
 import org.apache.flink.runtime.checkpoint.savepoint.Savepoint
 import org.apache.flink.runtime.checkpoint.{CheckpointOptions, CheckpointRecoveryFactory}
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager
@@ -110,6 +111,7 @@ class TestingCluster(
     ioExecutor: Executor,
     instanceManager: InstanceManager,
     scheduler: Scheduler,
+    blobServer: BlobServer,
     libraryCacheManager: BlobLibraryCacheManager,
     archive: ActorRef,
     restartStrategyFactory: RestartStrategyFactory,
@@ -127,6 +129,7 @@ class TestingCluster(
       ioExecutor,
       instanceManager,
       scheduler,
+      blobServer,
       libraryCacheManager,
       archive,
       restartStrategyFactory,

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
index f50a832..8b9ce15 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.{Executor, ScheduledExecutorService}
 
 import akka.actor.ActorRef
 import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.blob.BlobServer
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
@@ -34,15 +35,16 @@ import org.apache.flink.runtime.metrics.MetricRegistry
 import scala.concurrent.duration._
 import scala.language.postfixOps
 
-/** JobManager implementation extended by testing messages
-  *
-  */
+/**
+ * JobManager implementation extended by testing messages
+ */
 class TestingJobManager(
     flinkConfiguration: Configuration,
     futureExecutor: ScheduledExecutorService,
     ioExecutor: Executor,
     instanceManager: InstanceManager,
     scheduler: Scheduler,
+    blobServer: BlobServer,
     libraryCacheManager: BlobLibraryCacheManager,
     archive: ActorRef,
     restartStrategyFactory: RestartStrategyFactory,
@@ -58,6 +60,7 @@ class TestingJobManager(
     ioExecutor,
     instanceManager,
     scheduler,
+    blobServer,
     libraryCacheManager,
     archive,
     restartStrategyFactory,

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
index 3b8178b..82642ea 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
@@ -156,6 +157,7 @@ public class BlockingCheckpointsTest {
 				mock(TaskManagerActions.class),
 				mock(InputSplitProvider.class),
 				mock(CheckpointResponder.class),
+				mock(BlobCache.class),
 				new FallbackLibraryCacheManager(),
 				new FileCache(new String[] { EnvironmentInformation.getTemporaryFileDirectory() }),
 				new TestingTaskManagerRuntimeInfo(),

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
index 82e4f31..14ae733 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
@@ -274,6 +275,7 @@ public class InterruptSensitiveRestoreTest {
 			mock(TaskManagerActions.class),
 			mock(InputSplitProvider.class),
 			mock(CheckpointResponder.class),
+			mock(BlobCache.class),
 			new FallbackLibraryCacheManager(),
 			new FileCache(new String[] { EnvironmentInformation.getTemporaryFileDirectory() }),
 			new TestingTaskManagerRuntimeInfo(),

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
index 702d833..79e9583 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
@@ -153,6 +154,7 @@ public class StreamTaskTerminationTest extends TestLogger {
 			mock(TaskManagerActions.class),
 			mock(InputSplitProvider.class),
 			mock(CheckpointResponder.class),
+			mock(BlobCache.class),
 			new FallbackLibraryCacheManager(),
 			mock(FileCache.class),
 			taskManagerRuntimeInfo,

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 09e9a1b..08c3207 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
@@ -796,6 +797,7 @@ public class StreamTaskTest extends TestLogger {
 			StreamConfig taskConfig,
 			Configuration taskManagerConfig) throws Exception {
 
+		BlobCache blobCache = mock(BlobCache.class);
 		LibraryCacheManager libCache = mock(LibraryCacheManager.class);
 		when(libCache.getClassLoader(any(JobID.class))).thenReturn(StreamTaskTest.class.getClassLoader());
 
@@ -844,6 +846,7 @@ public class StreamTaskTest extends TestLogger {
 			mock(TaskManagerActions.class),
 			mock(InputSplitProvider.class),
 			mock(CheckpointResponder.class),
+			blobCache,
 			libCache,
 			mock(FileCache.class),
 			new TestingTaskManagerRuntimeInfo(taskManagerConfig, new String[] {System.getProperty("java.io.tmpdir")}),

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
index b539961..bd72d6d 100644
--- a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
+++ b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.{Executor, ScheduledExecutorService}
 
 import akka.actor.ActorRef
 import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.blob.BlobServer
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
@@ -58,6 +59,7 @@ class TestingYarnJobManager(
     ioExecutor: Executor,
     instanceManager: InstanceManager,
     scheduler: Scheduler,
+    blobServer: BlobServer,
     libraryCacheManager: BlobLibraryCacheManager,
     archive: ActorRef,
     restartStrategyFactory: RestartStrategyFactory,
@@ -73,6 +75,7 @@ class TestingYarnJobManager(
     ioExecutor,
     instanceManager,
     scheduler,
+    blobServer,
     libraryCacheManager,
     archive,
     restartStrategyFactory,

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
index a2d1668..b8dacee 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
@@ -24,6 +24,7 @@ import java.util.concurrent.{Executor, ScheduledExecutorService, TimeUnit}
 import akka.actor.ActorRef
 import org.apache.flink.configuration.{Configuration => FlinkConfiguration}
 import org.apache.flink.core.fs.Path
+import org.apache.flink.runtime.blob.BlobServer
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
 import org.apache.flink.runtime.clusterframework.ContaineredJobManager
 import org.apache.flink.runtime.clusterframework.messages.StopCluster
@@ -49,7 +50,8 @@ import scala.language.postfixOps
   * @param instanceManager Instance manager to manage the registered
   *                        [[org.apache.flink.runtime.taskmanager.TaskManager]]
   * @param scheduler Scheduler to schedule Flink jobs
-  * @param libraryCacheManager Manager to manage uploaded jar files
+  * @param blobServer BLOB store for file uploads
+  * @param libraryCacheManager manages uploaded jar files and class paths
   * @param archive Archive for finished Flink jobs
   * @param restartStrategyFactory Restart strategy to be used in case of a job recovery
   * @param timeout Timeout for futures
@@ -61,6 +63,7 @@ class YarnJobManager(
     ioExecutor: Executor,
     instanceManager: InstanceManager,
     scheduler: FlinkScheduler,
+    blobServer: BlobServer,
     libraryCacheManager: BlobLibraryCacheManager,
     archive: ActorRef,
     restartStrategyFactory: RestartStrategyFactory,
@@ -76,6 +79,7 @@ class YarnJobManager(
     ioExecutor,
     instanceManager,
     scheduler,
+    blobServer,
     libraryCacheManager,
     archive,
     restartStrategyFactory,