You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/08/14 10:32:11 UTC

[2/2] flink git commit: [FLINK-6180] [rpc] Remove TestingSerialRpcService

[FLINK-6180] [rpc] Remove TestingSerialRpcService

The TestingSerialRpcService produces thread interleavings which are not happening
when being executed with a proper RpcService implementation. Due to this the test
cases can fail or succeed wrongly. In order to avoid this problem, this commit
removes the TestingSerialRpcService and adapts all existing tests which used it
before.

Remove TestingSerialRpcService from MesosResourceManagerTest

Remove TestingSerialRpcService from ResourceManagerJobMasterTest

Remove TestingSerialRpcService from ResourceManagerTaskExecutorTest

Remove TestingSerialRpcService from ResourceManagerTest

Remove SerialTestingRpcService from JobMasterTest

Remove TestingSerialRpcService from TaskExecutorITCase

Remove TestingSerialRpcService from TaskExecutorTest

Remove TestingSerialRpcService from SlotPoolTest

Delete TestingSerialRpcService

This closes #4516.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3c84f8b9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3c84f8b9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3c84f8b9

Branch: refs/heads/master
Commit: 3c84f8b9b9fa36d6fbb2d340015ed151a9771e1f
Parents: 4c0fa6e
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Aug 10 14:07:09 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Aug 14 12:23:17 2017 +0200

----------------------------------------------------------------------
 .../MesosResourceManagerTest.java               |  13 +-
 .../apache/flink/runtime/instance/SlotPool.java |   4 +-
 .../flink/runtime/instance/SlotPoolGateway.java |   3 +-
 .../resourcemanager/ResourceManager.java        |   2 -
 .../flink/runtime/rpc/akka/AkkaRpcActor.java    |   3 +
 .../taskexecutor/slot/TaskSlotTable.java        |   2 -
 .../clusterframework/ResourceManagerTest.java   |  31 +-
 .../flink/runtime/instance/SlotPoolTest.java    | 334 ++++++++------
 .../flink/runtime/jobmaster/JobMasterTest.java  |  27 +-
 .../resourcemanager/ResourceManagerHATest.java  |  31 +-
 .../ResourceManagerJobMasterTest.java           |  33 +-
 .../ResourceManagerTaskExecutorTest.java        |  25 +-
 .../runtime/rpc/TestingSerialRpcService.java    | 440 -------------------
 .../taskexecutor/TaskExecutorITCase.java        |  21 +-
 .../runtime/taskexecutor/TaskExecutorTest.java  |  90 ++--
 15 files changed, 382 insertions(+), 677 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3c84f8b9/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
index f9e35a9..e81a2de 100644
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
@@ -58,7 +58,7 @@ import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceManagerActio
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.TestingSerialRpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
@@ -205,14 +205,14 @@ public class MesosResourceManagerTest extends TestLogger {
 	static class Context implements AutoCloseable {
 
 		// services
-		TestingSerialRpcService rpcService;
+		TestingRpcService rpcService;
 		TestingFatalErrorHandler fatalErrorHandler;
 		MockMesosResourceManagerRuntimeServices rmServices;
 
 		// RM
 		ResourceManagerConfiguration rmConfiguration;
 		ResourceID rmResourceID;
-		static final String RM_ADDRESS = "/resourceManager";
+		static final String RM_ADDRESS = "resourceManager";
 		TestingMesosResourceManager resourceManager;
 
 		// domain objects for test purposes
@@ -239,7 +239,7 @@ public class MesosResourceManagerTest extends TestLogger {
 		 * Create mock RM dependencies.
 		 */
 		Context() throws Exception {
-			rpcService = new TestingSerialRpcService();
+			rpcService = new TestingRpcService();
 			fatalErrorHandler = new TestingFatalErrorHandler();
 			rmServices = new MockMesosResourceManagerRuntimeServices();
 
@@ -300,6 +300,7 @@ public class MesosResourceManagerTest extends TestLogger {
 			public final TestingLeaderElectionService rmLeaderElectionService;
 			public final JobLeaderIdService jobLeaderIdService;
 			public final SlotManager slotManager;
+			public final CompletableFuture<Boolean> slotManagerStarted;
 			public ResourceManagerActions rmActions;
 
 			public UUID rmLeaderSessionId;
@@ -312,6 +313,7 @@ public class MesosResourceManagerTest extends TestLogger {
 				heartbeatServices = new TestingHeartbeatServices(5L, 5L, scheduledExecutor);
 				metricRegistry = mock(MetricRegistry.class);
 				slotManager = mock(SlotManager.class);
+				slotManagerStarted = new CompletableFuture<>();
 				jobLeaderIdService = new JobLeaderIdService(
 					highAvailabilityServices,
 					rpcService.getScheduledExecutor(),
@@ -321,6 +323,7 @@ public class MesosResourceManagerTest extends TestLogger {
 					@Override
 					public Object answer(InvocationOnMock invocation) throws Throwable {
 						rmActions = invocation.getArgumentAt(2, ResourceManagerActions.class);
+						slotManagerStarted.complete(true);
 						return null;
 					}
 				}).when(slotManager).start(any(UUID.class), any(Executor.class), any(ResourceManagerActions.class));
@@ -426,6 +429,7 @@ public class MesosResourceManagerTest extends TestLogger {
 		 */
 		public MesosWorkerStore.Worker allocateWorker(Protos.TaskID taskID, ResourceProfile resourceProfile) throws Exception {
 			when(rmServices.workerStore.newTaskID()).thenReturn(taskID);
+			rmServices.slotManagerStarted.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 			rmServices.rmActions.allocateResource(resourceProfile);
 			MesosWorkerStore.Worker expected = MesosWorkerStore.Worker.newWorker(taskID, resourceProfile);
 
@@ -501,6 +505,7 @@ public class MesosResourceManagerTest extends TestLogger {
 
 			// allocate a worker
 			when(rmServices.workerStore.newTaskID()).thenReturn(task1).thenThrow(new AssertionFailedError());
+			rmServices.slotManagerStarted.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 			rmServices.rmActions.allocateResource(resourceProfile1);
 
 			// verify that a new worker was persisted, the internal state was updated, the task router was notified,

http://git-wip-us.apache.org/repos/asf/flink/blob/3c84f8b9/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
index de2b3e5..bf3de25 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -613,7 +613,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 	 * @param resourceID The id of the TaskManager
 	 */
 	@Override
-	public void releaseTaskManager(final ResourceID resourceID) {
+	public CompletableFuture<Acknowledge> releaseTaskManager(final ResourceID resourceID) {
 		if (registeredTaskManagers.remove(resourceID)) {
 			availableSlots.removeAllForTaskManager(resourceID);
 
@@ -622,6 +622,8 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 				slot.releaseSlot();
 			}
 		}
+
+		return CompletableFuture.completedFuture(Acknowledge.get());
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/3c84f8b9/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
index 8d4f2a5..32a9af5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
@@ -74,7 +75,7 @@ public interface SlotPoolGateway extends RpcGateway {
 
 	void registerTaskManager(ResourceID resourceID);
 
-	void releaseTaskManager(ResourceID resourceID);
+	CompletableFuture<Acknowledge> releaseTaskManager(ResourceID resourceID);
 
 	CompletableFuture<Boolean> offerSlot(AllocatedSlot slot);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3c84f8b9/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index c2b0590..e8ec0e0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -899,8 +899,6 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 				clearState();
 
 				slotManager.suspend();
-
-				leaderSessionId = null;
 			}
 		});
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/3c84f8b9/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
index f557447..d51607e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
@@ -189,6 +189,9 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor {
 
 		if (rpcMethod != null) {
 			try {
+				// this supports declaration of anonymous classes
+				rpcMethod.setAccessible(true);
+
 				if (rpcMethod.getReturnType().equals(Void.TYPE)) {
 					// No return value to send back
 					rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs());

http://git-wip-us.apache.org/repos/asf/flink/blob/3c84f8b9/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
index 5c51c7c..3634df0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
@@ -291,8 +291,6 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
 		TaskSlot taskSlot = getTaskSlot(allocationId);
 
 		if (taskSlot != null) {
-			LOG.info("Free slot {}.", allocationId, cause);
-
 			final JobID jobId = taskSlot.getJobId();
 
 			if (taskSlot.markFree()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/3c84f8b9/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
index 3ca0327..9ad251b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
@@ -48,9 +48,10 @@ import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
-import org.apache.flink.runtime.rpc.TestingSerialRpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
@@ -64,6 +65,8 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
 import scala.Option;
 
 import java.util.ArrayList;
@@ -93,6 +96,8 @@ public class ResourceManagerTest extends TestLogger {
 
 	private static Configuration config = new Configuration();
 
+	private final Time timeout = Time.seconds(10L);
+
 	private TestingHighAvailabilityServices highAvailabilityServices;
 	private TestingLeaderRetrievalService jobManagerLeaderRetrievalService;
 
@@ -479,7 +484,7 @@ public class ResourceManagerTest extends TestLogger {
 		final ResourceID resourceManagerResourceID = ResourceID.generate();
 		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
 
-		final TestingSerialRpcService rpcService = new TestingSerialRpcService();
+		final TestingRpcService rpcService = new TestingRpcService();
 		rpcService.registerGateway(taskManagerAddress, taskExecutorGateway);
 
 		final ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration(
@@ -519,18 +524,20 @@ public class ResourceManagerTest extends TestLogger {
 
 			resourceManager.start();
 
+			final ResourceManagerGateway rmGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class);
+
 			final UUID rmLeaderSessionId = UUID.randomUUID();
 			rmLeaderElectionService.isLeader(rmLeaderSessionId);
 
 			final SlotReport slotReport = new SlotReport();
 			// test registration response successful and it will trigger monitor heartbeat target, schedule heartbeat request at interval time
-			CompletableFuture<RegistrationResponse> successfulFuture = resourceManager.registerTaskExecutor(
+			CompletableFuture<RegistrationResponse> successfulFuture = rmGateway.registerTaskExecutor(
 				rmLeaderSessionId,
 				taskManagerAddress,
 				taskManagerResourceID,
 				slotReport,
-				Time.milliseconds(0L));
-			RegistrationResponse response = successfulFuture.get(5, TimeUnit.SECONDS);
+				timeout);
+			RegistrationResponse response = successfulFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 			assertTrue(response instanceof TaskExecutorRegistrationSuccess);
 
 			ArgumentCaptor<Runnable> heartbeatRunnableCaptor = ArgumentCaptor.forClass(Runnable.class);
@@ -557,7 +564,7 @@ public class ResourceManagerTest extends TestLogger {
 			// run the timeout runnable to simulate a heartbeat timeout
 			timeoutRunnable.run();
 
-			verify(taskExecutorGateway).disconnectResourceManager(any(TimeoutException.class));
+			verify(taskExecutorGateway, Mockito.timeout(timeout.toMilliseconds())).disconnectResourceManager(any(TimeoutException.class));
 
 		} finally {
 			rpcService.stopService();
@@ -575,7 +582,7 @@ public class ResourceManagerTest extends TestLogger {
 
 		final JobMasterGateway jobMasterGateway = mock(JobMasterGateway.class);
 
-		final TestingSerialRpcService rpcService = new TestingSerialRpcService();
+		final TestingRpcService rpcService = new TestingRpcService();
 		rpcService.registerGateway(jobMasterAddress, jobMasterGateway);
 
 		final ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration(
@@ -620,17 +627,19 @@ public class ResourceManagerTest extends TestLogger {
 
 			resourceManager.start();
 
+			final ResourceManagerGateway rmGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class);
+
 			rmLeaderElectionService.isLeader(rmLeaderId);
 
 			// test registration response successful and it will trigger monitor heartbeat target, schedule heartbeat request at interval time
-			CompletableFuture<RegistrationResponse> successfulFuture = resourceManager.registerJobManager(
+			CompletableFuture<RegistrationResponse> successfulFuture = rmGateway.registerJobManager(
 				rmLeaderId,
 				jmLeaderId,
 				jmResourceId,
 				jobMasterAddress,
 				jobId,
-				Time.milliseconds(0L));
-			RegistrationResponse response = successfulFuture.get(5, TimeUnit.SECONDS);
+				timeout);
+			RegistrationResponse response = successfulFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 			assertTrue(response instanceof JobMasterRegistrationSuccess);
 
 			ArgumentCaptor<Runnable> heartbeatRunnableCaptor = ArgumentCaptor.forClass(Runnable.class);
@@ -657,7 +666,7 @@ public class ResourceManagerTest extends TestLogger {
 			// run the timeout runnable to simulate a heartbeat timeout
 			timeoutRunnable.run();
 
-			verify(jobMasterGateway).disconnectResourceManager(eq(jmLeaderId), eq(rmLeaderId), any(TimeoutException.class));
+			verify(jobMasterGateway, Mockito.timeout(timeout.toMilliseconds())).disconnectResourceManager(eq(jmLeaderId), eq(rmLeaderId), any(TimeoutException.class));
 
 		} finally {
 			rpcService.stopService();

http://git-wip-us.apache.org/repos/asf/flink/blob/3c84f8b9/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
index aeceb59..68c43f8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
@@ -28,15 +28,15 @@ import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
-import org.apache.flink.runtime.rpc.MainThreadValidatorUtil;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.TestingSerialRpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.TestLogger;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
 
 import java.util.List;
 import java.util.UUID;
@@ -51,218 +51,268 @@ import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.RETURNS_MOCKS;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class SlotPoolTest extends TestLogger {
 
+	private final Time timeout = Time.seconds(10L);
+
 	private RpcService rpcService;
 
 	private JobID jobId;
 
-	private MainThreadValidatorUtil mainThreadValidatorUtil;
-
-	private SlotPool slotPool;
-
-	private ResourceManagerGateway resourceManagerGateway;
-
 	@Before
 	public void setUp() throws Exception {
-
-		this.rpcService = new TestingSerialRpcService();
+		this.rpcService = new TestingRpcService();
 		this.jobId = new JobID();
-		this.slotPool = new SlotPool(rpcService, jobId);
-
-		this.mainThreadValidatorUtil = new MainThreadValidatorUtil(slotPool);
-
-		mainThreadValidatorUtil.enterMainThread();
-
-		final String jobManagerAddress = "foobar";
-
-		slotPool.start(UUID.randomUUID(), jobManagerAddress);
-
-		this.resourceManagerGateway = mock(ResourceManagerGateway.class);
-		when(resourceManagerGateway
-			.requestSlot(any(UUID.class), any(UUID.class), any(SlotRequest.class), any(Time.class)))
-			.thenReturn(mock(CompletableFuture.class, RETURNS_MOCKS));
-
-		slotPool.connectToResourceManager(UUID.randomUUID(), resourceManagerGateway);
 	}
 
 	@After
 	public void tearDown() throws Exception {
-		mainThreadValidatorUtil.exitMainThread();
+		rpcService.stopService();
 	}
 
 	@Test
 	public void testAllocateSimpleSlot() throws Exception {
-		ResourceID resourceID = new ResourceID("resource");
-		slotPool.registerTaskManager(resourceID);
-
-		ScheduledUnit task = mock(ScheduledUnit.class);
-		CompletableFuture<SimpleSlot> future = slotPool.allocateSlot(task, DEFAULT_TESTING_PROFILE, null, Time.milliseconds(0L));
-		assertFalse(future.isDone());
-
-		ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
-		verify(resourceManagerGateway).requestSlot(any(UUID.class), any(UUID.class), slotRequestArgumentCaptor.capture(), any(Time.class));
-
-		final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue();
-
-		AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
-		assertTrue(slotPool.offerSlot(allocatedSlot).get());
-
-		SimpleSlot slot = future.get(1, TimeUnit.SECONDS);
-		assertTrue(future.isDone());
-		assertTrue(slot.isAlive());
-		assertEquals(resourceID, slot.getTaskManagerID());
-		assertEquals(jobId, slot.getJobID());
-		assertEquals(slotPool.getSlotOwner(), slot.getOwner());
-		assertEquals(slotPool.getAllocatedSlots().get(slot.getAllocatedSlot().getSlotAllocationId()), slot);
+		ResourceManagerGateway resourceManagerGateway = createResourceManagerGatewayMock();
+		final SlotPool slotPool = new SlotPool(rpcService, jobId);
+
+		try {
+			SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway);
+			ResourceID resourceID = new ResourceID("resource");
+			slotPoolGateway.registerTaskManager(resourceID);
+
+			ScheduledUnit task = mock(ScheduledUnit.class);
+			CompletableFuture<SimpleSlot> future = slotPoolGateway.allocateSlot(task, DEFAULT_TESTING_PROFILE, null, timeout);
+			assertFalse(future.isDone());
+
+			ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
+			verify(resourceManagerGateway, Mockito.timeout(timeout.toMilliseconds())).requestSlot(any(UUID.class), any(UUID.class), slotRequestArgumentCaptor.capture(), any(Time.class));
+
+			final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue();
+
+			AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
+			assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get());
+
+			SimpleSlot slot = future.get(1, TimeUnit.SECONDS);
+			assertTrue(future.isDone());
+			assertTrue(slot.isAlive());
+			assertEquals(resourceID, slot.getTaskManagerID());
+			assertEquals(jobId, slot.getJobID());
+			assertEquals(slotPool.getSlotOwner(), slot.getOwner());
+			assertEquals(slotPool.getAllocatedSlots().get(slot.getAllocatedSlot().getSlotAllocationId()), slot);
+		} finally {
+			slotPool.shutDown();
+		}
 	}
 
 	@Test
 	public void testAllocationFulfilledByReturnedSlot() throws Exception {
-		ResourceID resourceID = new ResourceID("resource");
-		slotPool.registerTaskManager(resourceID);
+		ResourceManagerGateway resourceManagerGateway = createResourceManagerGatewayMock();
+		final SlotPool slotPool = new SlotPool(rpcService, jobId);
+
+		try {
+			SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway);
+			ResourceID resourceID = new ResourceID("resource");
+			slotPool.registerTaskManager(resourceID);
 
-		CompletableFuture<SimpleSlot> future1 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null, Time.milliseconds(0L));
-		CompletableFuture<SimpleSlot> future2 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null, Time.milliseconds(0L));
+			CompletableFuture<SimpleSlot> future1 = slotPoolGateway.allocateSlot(mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
+			CompletableFuture<SimpleSlot> future2 = slotPoolGateway.allocateSlot(mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
 
-		assertFalse(future1.isDone());
-		assertFalse(future2.isDone());
+			assertFalse(future1.isDone());
+			assertFalse(future2.isDone());
 
-		ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
-		verify(resourceManagerGateway, times(2))
-			.requestSlot(any(UUID.class), any(UUID.class), slotRequestArgumentCaptor.capture(), any(Time.class));
+			ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
+			verify(resourceManagerGateway, Mockito.timeout(timeout.toMilliseconds()).times(2))
+				.requestSlot(any(UUID.class), any(UUID.class), slotRequestArgumentCaptor.capture(), any(Time.class));
 
-		final List<SlotRequest> slotRequests = slotRequestArgumentCaptor.getAllValues();
+			final List<SlotRequest> slotRequests = slotRequestArgumentCaptor.getAllValues();
 
-		AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequests.get(0).getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
-		assertTrue(slotPool.offerSlot(allocatedSlot).get());
+			AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequests.get(0).getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
+			assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get());
 
-		SimpleSlot slot1 = future1.get(1, TimeUnit.SECONDS);
-		assertTrue(future1.isDone());
-		assertFalse(future2.isDone());
+			SimpleSlot slot1 = future1.get(1, TimeUnit.SECONDS);
+			assertTrue(future1.isDone());
+			assertFalse(future2.isDone());
 
-		// return this slot to pool
-		slot1.releaseSlot();
+			// return this slot to pool
+			slot1.releaseSlot();
 
-		// second allocation fulfilled by previous slot returning
-		SimpleSlot slot2 = future2.get(1, TimeUnit.SECONDS);
-		assertTrue(future2.isDone());
+			// second allocation fulfilled by previous slot returning
+			SimpleSlot slot2 = future2.get(1, TimeUnit.SECONDS);
+			assertTrue(future2.isDone());
 
-		assertNotEquals(slot1, slot2);
-		assertTrue(slot1.isReleased());
-		assertTrue(slot2.isAlive());
-		assertEquals(slot1.getTaskManagerID(), slot2.getTaskManagerID());
-		assertEquals(slot1.getSlotNumber(), slot2.getSlotNumber());
-		assertEquals(slotPool.getAllocatedSlots().get(slot1.getAllocatedSlot().getSlotAllocationId()), slot2);
+			assertNotEquals(slot1, slot2);
+			assertTrue(slot1.isReleased());
+			assertTrue(slot2.isAlive());
+			assertEquals(slot1.getTaskManagerID(), slot2.getTaskManagerID());
+			assertEquals(slot1.getSlotNumber(), slot2.getSlotNumber());
+			assertEquals(slotPool.getAllocatedSlots().get(slot1.getAllocatedSlot().getSlotAllocationId()), slot2);
+		} finally {
+			slotPool.shutDown();
+		}
 	}
 
 	@Test
 	public void testAllocateWithFreeSlot() throws Exception {
-		ResourceID resourceID = new ResourceID("resource");
-		slotPool.registerTaskManager(resourceID);
+		ResourceManagerGateway resourceManagerGateway = createResourceManagerGatewayMock();
+		final SlotPool slotPool = new SlotPool(rpcService, jobId);
 
-		CompletableFuture<SimpleSlot> future1 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null, Time.milliseconds(0L));
-		assertFalse(future1.isDone());
+		try {
+			SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway);
+			ResourceID resourceID = new ResourceID("resource");
+			slotPoolGateway.registerTaskManager(resourceID);
 
-		ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
-		verify(resourceManagerGateway).requestSlot(any(UUID.class), any(UUID.class), slotRequestArgumentCaptor.capture(), any(Time.class));
+			CompletableFuture<SimpleSlot> future1 = slotPoolGateway.allocateSlot(mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
+			assertFalse(future1.isDone());
 
-		final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue();
+			ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
+			verify(resourceManagerGateway, Mockito.timeout(timeout.toMilliseconds())).requestSlot(any(UUID.class), any(UUID.class), slotRequestArgumentCaptor.capture(), any(Time.class));
 
-		AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
-		assertTrue(slotPool.offerSlot(allocatedSlot).get());
+			final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue();
 
-		SimpleSlot slot1 = future1.get(1, TimeUnit.SECONDS);
-		assertTrue(future1.isDone());
+			AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
+			assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get());
 
-		// return this slot to pool
-		slot1.releaseSlot();
+			SimpleSlot slot1 = future1.get(1, TimeUnit.SECONDS);
+			assertTrue(future1.isDone());
 
-		CompletableFuture<SimpleSlot> future2 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null, Time.milliseconds(0L));
+			// return this slot to pool
+			slot1.releaseSlot();
 
-		// second allocation fulfilled by previous slot returning
-		SimpleSlot slot2 = future2.get(1, TimeUnit.SECONDS);
-		assertTrue(future2.isDone());
+			CompletableFuture<SimpleSlot> future2 = slotPoolGateway.allocateSlot(mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
 
-		assertNotEquals(slot1, slot2);
-		assertTrue(slot1.isReleased());
-		assertTrue(slot2.isAlive());
-		assertEquals(slot1.getTaskManagerID(), slot2.getTaskManagerID());
-		assertEquals(slot1.getSlotNumber(), slot2.getSlotNumber());
+			// second allocation fulfilled by previous slot returning
+			SimpleSlot slot2 = future2.get(1, TimeUnit.SECONDS);
+			assertTrue(future2.isDone());
+
+			assertNotEquals(slot1, slot2);
+			assertTrue(slot1.isReleased());
+			assertTrue(slot2.isAlive());
+			assertEquals(slot1.getTaskManagerID(), slot2.getTaskManagerID());
+			assertEquals(slot1.getSlotNumber(), slot2.getSlotNumber());
+		} finally {
+			slotPool.shutDown();
+		}
 	}
 
 	@Test
 	public void testOfferSlot() throws Exception {
-		ResourceID resourceID = new ResourceID("resource");
-		slotPool.registerTaskManager(resourceID);
+		ResourceManagerGateway resourceManagerGateway = createResourceManagerGatewayMock();
+		final SlotPool slotPool = new SlotPool(rpcService, jobId);
+
+		try {
+			SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway);
+			ResourceID resourceID = new ResourceID("resource");
+			slotPoolGateway.registerTaskManager(resourceID);
 
-		CompletableFuture<SimpleSlot> future = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null, Time.milliseconds(0L));
-		assertFalse(future.isDone());
+			CompletableFuture<SimpleSlot> future = slotPoolGateway.allocateSlot(mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
+			assertFalse(future.isDone());
 
-		ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
-		verify(resourceManagerGateway).requestSlot(any(UUID.class), any(UUID.class), slotRequestArgumentCaptor.capture(), any(Time.class));
+			ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
+			verify(resourceManagerGateway, Mockito.timeout(timeout.toMilliseconds())).requestSlot(any(UUID.class), any(UUID.class), slotRequestArgumentCaptor.capture(), any(Time.class));
 
-		final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue();
+			final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue();
 
-		// slot from unregistered resource
-		AllocatedSlot invalid = createAllocatedSlot(new ResourceID("unregistered"), slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
-		assertFalse(slotPool.offerSlot(invalid).get());
+			// slot from unregistered resource
+			AllocatedSlot invalid = createAllocatedSlot(new ResourceID("unregistered"), slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
+			assertFalse(slotPoolGateway.offerSlot(invalid).get());
 
-		AllocatedSlot notRequested = createAllocatedSlot(resourceID, new AllocationID(), jobId, DEFAULT_TESTING_PROFILE);
+			AllocatedSlot notRequested = createAllocatedSlot(resourceID, new AllocationID(), jobId, DEFAULT_TESTING_PROFILE);
 
-		// we'll also accept non requested slots
-		assertTrue(slotPool.offerSlot(notRequested).get());
+			// we'll also accept non requested slots
+			assertTrue(slotPoolGateway.offerSlot(notRequested).get());
 
-		AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
+			AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
 
-		// accepted slot
-		assertTrue(slotPool.offerSlot(allocatedSlot).get());
-		SimpleSlot slot = future.get(1, TimeUnit.SECONDS);
-		assertTrue(future.isDone());
-		assertTrue(slot.isAlive());
+			// accepted slot
+			assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get());
+			SimpleSlot slot = future.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+			assertTrue(slot.isAlive());
 
-		// duplicated offer with using slot
-		assertTrue(slotPool.offerSlot(allocatedSlot).get());
-		assertTrue(future.isDone());
-		assertTrue(slot.isAlive());
+			// duplicated offer with using slot
+			assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get());
+			assertTrue(slot.isAlive());
 
-		// duplicated offer with free slot
-		slot.releaseSlot();
-		assertTrue(slot.isReleased());
-		assertTrue(slotPool.offerSlot(allocatedSlot).get());
+			// duplicated offer with free slot
+			slot.releaseSlot();
+			assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get());
+		} finally {
+			slotPool.shutDown();
+		}
 	}
 
 	@Test
 	public void testReleaseResource() throws Exception {
-		ResourceID resourceID = new ResourceID("resource");
-		slotPool.registerTaskManager(resourceID);
+		ResourceManagerGateway resourceManagerGateway = createResourceManagerGatewayMock();
+
+		final CompletableFuture<Boolean> slotReturnFuture = new CompletableFuture<>();
+
+		final SlotPool slotPool = new SlotPool(rpcService, jobId) {
+			@Override
+			public void returnAllocatedSlot(Slot slot) {
+				super.returnAllocatedSlot(slot);
+
+				slotReturnFuture.complete(true);
+			}
+		};
+
+		try {
+			SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway);
+			ResourceID resourceID = new ResourceID("resource");
+			slotPoolGateway.registerTaskManager(resourceID);
+
+			CompletableFuture<SimpleSlot> future1 = slotPoolGateway.allocateSlot(mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
+
+			ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
+			verify(resourceManagerGateway, Mockito.timeout(timeout.toMilliseconds())).requestSlot(any(UUID.class), any(UUID.class), slotRequestArgumentCaptor.capture(), any(Time.class));
+
+			final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue();
+
+			CompletableFuture<SimpleSlot> future2 = slotPoolGateway.allocateSlot(mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
+
+			AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
+			assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get());
 
-		CompletableFuture<SimpleSlot> future1 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null, Time.milliseconds(0L));
+			SimpleSlot slot1 = future1.get(1, TimeUnit.SECONDS);
+			assertTrue(future1.isDone());
+			assertFalse(future2.isDone());
 
-		ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
-		verify(resourceManagerGateway).requestSlot(any(UUID.class), any(UUID.class), slotRequestArgumentCaptor.capture(), any(Time.class));
+			slotPoolGateway.releaseTaskManager(resourceID);
 
-		final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue();
+			// wait until the slot has been returned
+			slotReturnFuture.get();
 
-		CompletableFuture<SimpleSlot> future2 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null, Time.milliseconds(0L));
+			assertTrue(slot1.isReleased());
 
-		AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
-		assertTrue(slotPool.offerSlot(allocatedSlot).get());
+			// slot released and not usable, second allocation still not fulfilled
+			Thread.sleep(10);
+			assertFalse(future2.isDone());
+		} finally {
+			slotPool.shutDown();
+		}
+	}
+
+	private static ResourceManagerGateway createResourceManagerGatewayMock() {
+		ResourceManagerGateway resourceManagerGateway = mock(ResourceManagerGateway.class);
+		when(resourceManagerGateway
+			.requestSlot(any(UUID.class), any(UUID.class), any(SlotRequest.class), any(Time.class)))
+			.thenReturn(mock(CompletableFuture.class, RETURNS_MOCKS));
 
-		SimpleSlot slot1 = future1.get(1, TimeUnit.SECONDS);
-		assertTrue(future1.isDone());
-		assertFalse(future2.isDone());
+		return resourceManagerGateway;
+	}
 
-		slotPool.releaseTaskManager(resourceID);
-		assertTrue(slot1.isReleased());
+	private static SlotPoolGateway setupSlotPool(
+			SlotPool slotPool,
+			ResourceManagerGateway resourceManagerGateway) throws Exception {
+		final String jobManagerAddress = "foobar";
+
+		slotPool.start(UUID.randomUUID(), jobManagerAddress);
+
+		slotPool.connectToResourceManager(UUID.randomUUID(), resourceManagerGateway);
 
-		// slot released and not usable, second allocation still not fulfilled
-		Thread.sleep(10);
-		assertFalse(future2.isDone());
+		return slotPool.getSelfGateway(SlotPoolGateway.class);
 	}
 
 	static AllocatedSlot createAllocatedSlot(

http://git-wip-us.apache.org/repos/asf/flink/blob/3c84f8b9/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 0c4d376..df35369 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -33,8 +33,9 @@ import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
-import org.apache.flink.runtime.rpc.TestingSerialRpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
@@ -62,6 +63,8 @@ import static org.mockito.Mockito.when;
 @PrepareForTest(BlobLibraryCacheManager.class)
 public class JobMasterTest extends TestLogger {
 
+	private final Time testingTimeout = Time.seconds(10L);
+
 	@Test
 	public void testHeartbeatTimeoutWithTaskManager() throws Exception {
 		final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
@@ -81,7 +84,7 @@ public class JobMasterTest extends TestLogger {
 		final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(tmResourceId, InetAddress.getLoopbackAddress(), 1234);
 		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
 
-		final TestingSerialRpcService rpc = new TestingSerialRpcService();
+		final TestingRpcService rpc = new TestingRpcService();
 		rpc.registerGateway(taskManagerAddress, taskExecutorGateway);
 
 		final long heartbeatInterval = 1L;
@@ -89,6 +92,8 @@ public class JobMasterTest extends TestLogger {
 
 		final ScheduledExecutor scheduledExecutor = mock(ScheduledExecutor.class);
 		final HeartbeatServices heartbeatServices = new TestingHeartbeatServices(heartbeatInterval, heartbeatTimeout, scheduledExecutor);
+		final BlobLibraryCacheManager libraryCacheManager = mock(BlobLibraryCacheManager.class);
+		when(libraryCacheManager.getBlobServerPort()).thenReturn(1337);
 
 		final JobGraph jobGraph = new JobGraph();
 
@@ -101,7 +106,7 @@ public class JobMasterTest extends TestLogger {
 				haServices,
 				heartbeatServices,
 				Executors.newScheduledThreadPool(1),
-				mock(BlobLibraryCacheManager.class),
+				libraryCacheManager,
 				mock(RestartStrategyFactory.class),
 				Time.of(10, TimeUnit.SECONDS),
 				null,
@@ -111,8 +116,14 @@ public class JobMasterTest extends TestLogger {
 
 			jobMaster.start(jmLeaderId);
 
+			final JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class);
+
 			// register task manager will trigger monitor heartbeat target, schedule heartbeat request at interval time
-			jobMaster.registerTaskManager(taskManagerAddress, taskManagerLocation, jmLeaderId, Time.milliseconds(0L));
+			CompletableFuture<RegistrationResponse> registrationResponse = jobMasterGateway
+				.registerTaskManager(taskManagerAddress, taskManagerLocation, jmLeaderId, testingTimeout);
+
+			// wait for the completion of the registration
+			registrationResponse.get();
 
 			ArgumentCaptor<Runnable> heartbeatRunnableCaptor = ArgumentCaptor.forClass(Runnable.class);
 			verify(scheduledExecutor, times(1)).scheduleAtFixedRate(
@@ -124,7 +135,7 @@ public class JobMasterTest extends TestLogger {
 			Runnable heartbeatRunnable = heartbeatRunnableCaptor.getValue();
 
 			ArgumentCaptor<Runnable> timeoutRunnableCaptor = ArgumentCaptor.forClass(Runnable.class);
-			verify(scheduledExecutor).schedule(timeoutRunnableCaptor.capture(), eq(heartbeatTimeout), eq(TimeUnit.MILLISECONDS));
+			verify(scheduledExecutor, timeout(testingTimeout.toMilliseconds())).schedule(timeoutRunnableCaptor.capture(), eq(heartbeatTimeout), eq(TimeUnit.MILLISECONDS));
 
 			Runnable timeoutRunnable = timeoutRunnableCaptor.getValue();
 
@@ -136,7 +147,7 @@ public class JobMasterTest extends TestLogger {
 			// run the timeout runnable to simulate a heartbeat timeout
 			timeoutRunnable.run();
 
-			verify(taskExecutorGateway).disconnectJobManager(eq(jobGraph.getJobID()), any(TimeoutException.class));
+			verify(taskExecutorGateway, timeout(testingTimeout.toMilliseconds())).disconnectJobManager(eq(jobGraph.getJobID()), any(TimeoutException.class));
 
 			// check if a concurrent error occurred
 			testingFatalErrorHandler.rethrowError();
@@ -179,7 +190,7 @@ public class JobMasterTest extends TestLogger {
 		)).thenReturn(CompletableFuture.completedFuture(new JobMasterRegistrationSuccess(
 			heartbeatInterval, rmLeaderId, rmResourceId)));
 
-		final TestingSerialRpcService rpc = new TestingSerialRpcService();
+		final TestingRpcService rpc = new TestingRpcService();
 		rpc.registerGateway(resourceManagerAddress, resourceManagerGateway);
 
 		final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
@@ -207,7 +218,7 @@ public class JobMasterTest extends TestLogger {
 			rmLeaderRetrievalService.notifyListener(resourceManagerAddress, rmLeaderId);
 
 			// register job manager success will trigger monitor heartbeat target between jm and rm
-			verify(resourceManagerGateway).registerJobManager(
+			verify(resourceManagerGateway, timeout(testingTimeout.toMilliseconds())).registerJobManager(
 				eq(rmLeaderId),
 				eq(jmLeaderId),
 				eq(jmResourceId),

http://git-wip-us.apache.org/repos/asf/flink/blob/3c84f8b9/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
index 986f848..c213752 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
@@ -27,7 +27,7 @@ import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerConfiguration;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.TestingSerialRpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.TestLogger;
@@ -35,6 +35,7 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 
 import static org.mockito.Mockito.mock;
 
@@ -46,9 +47,17 @@ public class ResourceManagerHATest extends TestLogger {
 	@Test
 	public void testGrantAndRevokeLeadership() throws Exception {
 		ResourceID rmResourceId = ResourceID.generate();
-		RpcService rpcService = new TestingSerialRpcService();
+		RpcService rpcService = new TestingRpcService();
+
+		CompletableFuture<UUID> leaderSessionIdFuture = new CompletableFuture<>();
+
+		TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService() {
+			@Override
+			public void confirmLeaderSessionID(UUID leaderId) {
+				leaderSessionIdFuture.complete(leaderId);
+			}
+		};
 
-		TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService();
 		TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
 		highAvailabilityServices.setResourceManagerLeaderElectionService(leaderElectionService);
 
@@ -73,6 +82,8 @@ public class ResourceManagerHATest extends TestLogger {
 
 		TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
 
+		CompletableFuture<UUID> revokedLeaderIdFuture = new CompletableFuture<>();
+
 		final ResourceManager resourceManager =
 			new StandaloneResourceManager(
 				rpcService,
@@ -84,17 +95,25 @@ public class ResourceManagerHATest extends TestLogger {
 				resourceManagerRuntimeServices.getSlotManager(),
 				metricRegistry,
 				resourceManagerRuntimeServices.getJobLeaderIdService(),
-				testingFatalErrorHandler);
+				testingFatalErrorHandler) {
+
+				@Override
+				public void revokeLeadership() {
+					super.revokeLeadership();
+					runAsync(
+						() -> revokedLeaderIdFuture.complete(getLeaderSessionId()));
+				}
+			};
 		resourceManager.start();
 		// before grant leadership, resourceManager's leaderId is null
 		Assert.assertEquals(null, resourceManager.getLeaderSessionId());
 		final UUID leaderId = UUID.randomUUID();
 		leaderElectionService.isLeader(leaderId);
 		// after grant leadership, resourceManager's leaderId has value
-		Assert.assertEquals(leaderId, resourceManager.getLeaderSessionId());
+		Assert.assertEquals(leaderId, leaderSessionIdFuture.get());
 		// then revoke leadership, resourceManager's leaderId is null again
 		leaderElectionService.notLeader();
-		Assert.assertEquals(null, resourceManager.getLeaderSessionId());
+		Assert.assertEquals(null, revokedLeaderIdFuture.get());
 
 		if (testingFatalErrorHandler.hasExceptionOccurred()) {
 			testingFatalErrorHandler.rethrowError();

http://git-wip-us.apache.org/repos/asf/flink/blob/3c84f8b9/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
index 10d6a72..139bfc4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
@@ -32,7 +32,7 @@ import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.TestingSerialRpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
@@ -50,13 +50,13 @@ import static org.mockito.Mockito.*;
 
 public class ResourceManagerJobMasterTest extends TestLogger {
 
-	private TestingSerialRpcService rpcService;
+	private TestingRpcService rpcService;
 
-	private final Time timeout = Time.milliseconds(0L);
+	private final Time timeout = Time.seconds(10L);
 
 	@Before
 	public void setup() throws Exception {
-		rpcService = new TestingSerialRpcService();
+		rpcService = new TestingRpcService();
 	}
 
 	@After
@@ -77,17 +77,18 @@ public class ResourceManagerJobMasterTest extends TestLogger {
 		TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService(jobMasterAddress, jmLeaderID);
 		TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
 		final ResourceManager<?> resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler);
+		final ResourceManagerGateway rmGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class);
 		final UUID rmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
 
 		// test response successful
-		CompletableFuture<RegistrationResponse> successfulFuture = resourceManager.registerJobManager(
+		CompletableFuture<RegistrationResponse> successfulFuture = rmGateway.registerJobManager(
 			rmLeaderSessionId,
 			jmLeaderID,
 			jmResourceId,
 			jobMasterAddress,
 			jobID,
 			timeout);
-		RegistrationResponse response = successfulFuture.get(5L, TimeUnit.SECONDS);
+		RegistrationResponse response = successfulFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 		assertTrue(response instanceof JobMasterRegistrationSuccess);
 
 		if (testingFatalErrorHandler.hasExceptionOccurred()) {
@@ -108,11 +109,12 @@ public class ResourceManagerJobMasterTest extends TestLogger {
 		TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService(jobMasterAddress, jmLeaderID);
 		TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
 		final ResourceManager<?> resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler);
+		final ResourceManagerGateway rmGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class);
 		final UUID rmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
 
 		// test throw exception when receive a registration from job master which takes unmatched leaderSessionId
 		UUID differentLeaderSessionID = UUID.randomUUID();
-		CompletableFuture<RegistrationResponse> unMatchedLeaderFuture = resourceManager.registerJobManager(
+		CompletableFuture<RegistrationResponse> unMatchedLeaderFuture = rmGateway.registerJobManager(
 			differentLeaderSessionID,
 			jmLeaderID,
 			jmResourceId,
@@ -139,13 +141,14 @@ public class ResourceManagerJobMasterTest extends TestLogger {
 			HighAvailabilityServices.DEFAULT_LEADER_ID);
 		TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
 		final ResourceManager<?> resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler);
+		final ResourceManagerGateway rmGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class);
 		final UUID rmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
 		final UUID jmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
 		final ResourceID jmResourceId = new ResourceID(jobMasterAddress);
 
 		// test throw exception when receive a registration from job master which takes unmatched leaderSessionId
 		UUID differentLeaderSessionID = UUID.randomUUID();
-		CompletableFuture<RegistrationResponse> unMatchedLeaderFuture = resourceManager.registerJobManager(
+		CompletableFuture<RegistrationResponse> unMatchedLeaderFuture = rmGateway.registerJobManager(
 			rmLeaderSessionId,
 			differentLeaderSessionID,
 			jmResourceId,
@@ -172,13 +175,14 @@ public class ResourceManagerJobMasterTest extends TestLogger {
 			HighAvailabilityServices.DEFAULT_LEADER_ID);
 		TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
 		final ResourceManager<?> resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler);
+		final ResourceManagerGateway rmGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class);
 		final UUID rmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
 		final UUID jmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
 		final ResourceID jmResourceId = new ResourceID(jobMasterAddress);
 
 		// test throw exception when receive a registration from job master which takes invalid address
 		String invalidAddress = "/jobMasterAddress2";
-		CompletableFuture<RegistrationResponse> invalidAddressFuture = resourceManager.registerJobManager(
+		CompletableFuture<RegistrationResponse> invalidAddressFuture = rmGateway.registerJobManager(
 			rmLeaderSessionId,
 			jmLeaderSessionId,
 			jmResourceId,
@@ -204,21 +208,26 @@ public class ResourceManagerJobMasterTest extends TestLogger {
 			"localhost",
 			HighAvailabilityServices.DEFAULT_LEADER_ID);
 		TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
-		final ResourceManager<?> resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler);
+		final ResourceManager<?> resourceManager = createAndStartResourceManager(
+			resourceManagerLeaderElectionService,
+			jobID,
+			jobMasterLeaderRetrievalService,
+			testingFatalErrorHandler);
+		final ResourceManagerGateway rmGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class);
 		final UUID rmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
 		final UUID jmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
 		final ResourceID jmResourceId = new ResourceID(jobMasterAddress);
 
 		JobID unknownJobIDToHAServices = new JobID();
 		// verify return RegistrationResponse.Decline when failed to start a job master Leader retrieval listener
-		CompletableFuture<RegistrationResponse> declineFuture = resourceManager.registerJobManager(
+		CompletableFuture<RegistrationResponse> declineFuture = rmGateway.registerJobManager(
 			rmLeaderSessionId,
 			jmLeaderSessionId,
 			jmResourceId,
 			jobMasterAddress,
 			unknownJobIDToHAServices,
 			timeout);
-		RegistrationResponse response = declineFuture.get(5, TimeUnit.SECONDS);
+		RegistrationResponse response = declineFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 		assertTrue(response instanceof RegistrationResponse.Decline);
 
 		if (testingFatalErrorHandler.hasExceptionOccurred()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/3c84f8b9/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
index fc96f0d..616ed5c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -27,7 +27,7 @@ import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.TestingSerialRpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
@@ -49,7 +49,9 @@ import static org.mockito.Mockito.mock;
 
 public class ResourceManagerTaskExecutorTest extends TestLogger {
 
-	private TestingSerialRpcService rpcService;
+	private final Time timeout = Time.seconds(10L);
+
+	private TestingRpcService rpcService;
 
 	private SlotReport slotReport = new SlotReport();
 
@@ -61,19 +63,22 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 
 	private StandaloneResourceManager resourceManager;
 
+	private ResourceManagerGateway rmGateway;
+
 	private UUID leaderSessionId;
 
 	private TestingFatalErrorHandler testingFatalErrorHandler;
 
 	@Before
 	public void setup() throws Exception {
-		rpcService = new TestingSerialRpcService();
+		rpcService = new TestingRpcService();
 
 		taskExecutorResourceID = mockTaskExecutor(taskExecutorAddress);
 		resourceManagerResourceID = ResourceID.generate();
 		TestingLeaderElectionService rmLeaderElectionService = new TestingLeaderElectionService();
 		testingFatalErrorHandler = new TestingFatalErrorHandler();
 		resourceManager = createAndStartResourceManager(rmLeaderElectionService, testingFatalErrorHandler);
+		rmGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class);
 		leaderSessionId = grantLeadership(rmLeaderElectionService);
 	}
 
@@ -90,13 +95,13 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 		try {
 			// test response successful
 			CompletableFuture<RegistrationResponse> successfulFuture =
-				resourceManager.registerTaskExecutor(leaderSessionId, taskExecutorAddress, taskExecutorResourceID, slotReport, Time.milliseconds(0L));
-			RegistrationResponse response = successfulFuture.get(5, TimeUnit.SECONDS);
+				rmGateway.registerTaskExecutor(leaderSessionId, taskExecutorAddress, taskExecutorResourceID, slotReport, timeout);
+			RegistrationResponse response = successfulFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 			assertTrue(response instanceof TaskExecutorRegistrationSuccess);
 
 			// test response successful with instanceID not equal to previous when receive duplicate registration from taskExecutor
 			CompletableFuture<RegistrationResponse> duplicateFuture =
-				resourceManager.registerTaskExecutor(leaderSessionId, taskExecutorAddress, taskExecutorResourceID, slotReport, Time.milliseconds(0L));
+				rmGateway.registerTaskExecutor(leaderSessionId, taskExecutorAddress, taskExecutorResourceID, slotReport, timeout);
 			RegistrationResponse duplicateResponse = duplicateFuture.get();
 			assertTrue(duplicateResponse instanceof TaskExecutorRegistrationSuccess);
 			assertNotEquals(((TaskExecutorRegistrationSuccess) response).getRegistrationId(), ((TaskExecutorRegistrationSuccess) duplicateResponse).getRegistrationId());
@@ -116,8 +121,8 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 			// test throw exception when receive a registration from taskExecutor which takes unmatched leaderSessionId
 			UUID differentLeaderSessionID = UUID.randomUUID();
 			CompletableFuture<RegistrationResponse> unMatchedLeaderFuture =
-				resourceManager.registerTaskExecutor(differentLeaderSessionID, taskExecutorAddress, taskExecutorResourceID, slotReport, Time.milliseconds(0L));
-			assertTrue(unMatchedLeaderFuture.get(5, TimeUnit.SECONDS) instanceof RegistrationResponse.Decline);
+				rmGateway.registerTaskExecutor(differentLeaderSessionID, taskExecutorAddress, taskExecutorResourceID, slotReport, timeout);
+			assertTrue(unMatchedLeaderFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS) instanceof RegistrationResponse.Decline);
 		} finally {
 			if (testingFatalErrorHandler.hasExceptionOccurred()) {
 				testingFatalErrorHandler.rethrowError();
@@ -134,8 +139,8 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 			// test throw exception when receive a registration from taskExecutor which takes invalid address
 			String invalidAddress = "/taskExecutor2";
 			CompletableFuture<RegistrationResponse> invalidAddressFuture =
-				resourceManager.registerTaskExecutor(leaderSessionId, invalidAddress, taskExecutorResourceID, slotReport, Time.milliseconds(0L));
-			assertTrue(invalidAddressFuture.get(5, TimeUnit.SECONDS) instanceof RegistrationResponse.Decline);
+				rmGateway.registerTaskExecutor(leaderSessionId, invalidAddress, taskExecutorResourceID, slotReport, timeout);
+			assertTrue(invalidAddressFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS) instanceof RegistrationResponse.Decline);
 		} finally {
 			if (testingFatalErrorHandler.hasExceptionOccurred()) {
 				testingFatalErrorHandler.rethrowError();

http://git-wip-us.apache.org/repos/asf/flink/blob/3c84f8b9/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
deleted file mode 100644
index cb38f6f..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
+++ /dev/null
@@ -1,440 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc;
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.concurrent.ScheduledExecutor;
-import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
-import org.apache.flink.runtime.util.DirectExecutorService;
-import org.apache.flink.util.Preconditions;
-
-import java.lang.annotation.Annotation;
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-import java.util.List;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Delayed;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * An RPC Service implementation for testing. This RPC service directly executes all asynchronous
- * calls one by one in the calling thread.
- */
-public class TestingSerialRpcService implements RpcService {
-
-	private final DirectExecutorService executorService;
-	private final ScheduledExecutorService scheduledExecutorService;
-	private final ConcurrentHashMap<String, RpcGateway> registeredConnections;
-	private final CompletableFuture<Void> terminationFuture;
-
-	private final ScheduledExecutor scheduledExecutorServiceAdapter;
-
-	public TestingSerialRpcService() {
-		executorService = new DirectExecutorService();
-		scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
-		this.registeredConnections = new ConcurrentHashMap<>(16);
-		this.terminationFuture = new CompletableFuture<>();
-
-		this.scheduledExecutorServiceAdapter = new ScheduledExecutorServiceAdapter(scheduledExecutorService);
-	}
-
-	@Override
-	public ScheduledFuture<?> scheduleRunnable(final Runnable runnable, final long delay, final TimeUnit unit) {
-		try {
-			unit.sleep(delay);
-			runnable.run();
-
-			return new DoneScheduledFuture<Void>(null);
-		} catch (Throwable e) {
-			throw new RuntimeException(e);
-		}
-	}
-
-	@Override
-	public void execute(Runnable runnable) {
-		runnable.run();
-	}
-
-	@Override
-	public <T> CompletableFuture<T> execute(Callable<T> callable) {
-		try {
-			T result = callable.call();
-
-			return CompletableFuture.completedFuture(result);
-		} catch (Exception e) {
-			return FutureUtils.completedExceptionally(e);
-		}
-	}
-
-	@Override
-	public Executor getExecutor() {
-		return executorService;
-	}
-
-	public ScheduledExecutor getScheduledExecutor() {
-		return scheduledExecutorServiceAdapter;
-	}
-
-	@Override
-	public void stopService() {
-		executorService.shutdown();
-
-		scheduledExecutorService.shutdown();
-
-		boolean terminated = false;
-
-		try {
-			terminated = scheduledExecutorService.awaitTermination(1, TimeUnit.SECONDS);
-		} catch (InterruptedException e) {
-			Thread.currentThread().interrupt();
-		}
-
-		if (!terminated) {
-			List<Runnable> runnables = scheduledExecutorService.shutdownNow();
-
-			for (Runnable runnable : runnables) {
-				runnable.run();
-			}
-		}
-
-		registeredConnections.clear();
-		terminationFuture.complete(null);
-	}
-
-	@Override
-	public CompletableFuture<Void> getTerminationFuture() {
-		return terminationFuture;
-	}
-
-	@Override
-	public void stopServer(RpcServer selfGateway) {
-		registeredConnections.remove(selfGateway.getAddress());
-	}
-
-	@Override
-	public <S extends RpcEndpoint & RpcGateway> RpcServer startServer(S rpcEndpoint) {
-		final String address = UUID.randomUUID().toString();
-
-		InvocationHandler akkaInvocationHandler = new TestingSerialRpcService.TestingSerialInvocationHandler<>(address, rpcEndpoint);
-		ClassLoader classLoader = getClass().getClassLoader();
-
-		Set<Class<? extends RpcGateway>> implementedRpcGateways = RpcUtils.extractImplementedRpcGateways(rpcEndpoint.getClass());
-
-		implementedRpcGateways.add(RpcServer.class);
-
-
-		@SuppressWarnings("unchecked")
-		RpcServer rpcServer = (RpcServer) Proxy.newProxyInstance(
-			classLoader,
-			implementedRpcGateways.toArray(new Class<?>[implementedRpcGateways.size()]),
-			akkaInvocationHandler);
-
-		// register self
-		registeredConnections.putIfAbsent(rpcServer.getAddress(), rpcServer);
-
-		return rpcServer;
-	}
-
-	@Override
-	public String getAddress() {
-		return "";
-	}
-
-	@Override
-	public int getPort() {
-		return -1;
-	}
-
-	@Override
-	public <C extends RpcGateway> CompletableFuture<C> connect(String address, Class<C> clazz) {
-		RpcGateway gateway = registeredConnections.get(address);
-
-		if (gateway != null) {
-			if (clazz.isAssignableFrom(gateway.getClass())) {
-				@SuppressWarnings("unchecked")
-				C typedGateway = (C) gateway;
-				return CompletableFuture.completedFuture(typedGateway);
-			} else {
-				return FutureUtils.completedExceptionally(new Exception("Gateway registered under " + address + " is not of type " + clazz));
-			}
-		} else {
-			return FutureUtils.completedExceptionally(new Exception("No gateway registered under " + address + '.'));
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	// connections
-	// ------------------------------------------------------------------------
-
-	public void registerGateway(String address, RpcGateway gateway) {
-		checkNotNull(address);
-		checkNotNull(gateway);
-
-		if (registeredConnections.putIfAbsent(address, gateway) != null) {
-			throw new IllegalStateException("a gateway is already registered under " + address);
-		}
-	}
-
-	public void clearGateways() {
-		registeredConnections.clear();
-	}
-
-	private static final class TestingSerialInvocationHandler<T extends RpcEndpoint & RpcGateway> implements InvocationHandler, RpcGateway, MainThreadExecutable, StartStoppable {
-
-		private final T rpcEndpoint;
-
-		/** default timeout for asks */
-		private final Time timeout;
-
-		private final String address;
-
-		private TestingSerialInvocationHandler(String address, T rpcEndpoint) {
-			this(address, rpcEndpoint, Time.seconds(10));
-		}
-
-		private TestingSerialInvocationHandler(String address, T rpcEndpoint, Time timeout) {
-			this.rpcEndpoint = rpcEndpoint;
-			this.timeout = timeout;
-			this.address = address;
-		}
-
-		@Override
-		public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
-			Class<?> declaringClass = method.getDeclaringClass();
-			if (declaringClass.equals(MainThreadExecutable.class) ||
-				declaringClass.equals(Object.class) ||
-				declaringClass.equals(StartStoppable.class) ||
-				declaringClass.equals(RpcServer.class) ||
-				declaringClass.equals(RpcGateway.class)) {
-				return method.invoke(this, args);
-			} else {
-				final String methodName = method.getName();
-				Class<?>[] parameterTypes = method.getParameterTypes();
-				Annotation[][] parameterAnnotations = method.getParameterAnnotations();
-				Time futureTimeout = extractRpcTimeout(parameterAnnotations, args, timeout);
-
-				Class<?> returnType = method.getReturnType();
-
-				if (returnType.equals(CompletableFuture.class)) {
-					try {
-						Object result = handleRpcInvocationSync(methodName, parameterTypes, args, futureTimeout);
-						return CompletableFuture.completedFuture(result);
-					} catch (Throwable e) {
-						return FutureUtils.completedExceptionally(e);
-					}
-				} else {
-					return handleRpcInvocationSync(methodName, parameterTypes, args, futureTimeout);
-				}
-			}
-		}
-
-		/**
-		 * Handle rpc invocations by looking up the rpc method on the rpc endpoint and calling this
-		 * method with the provided method arguments. If the method has a return value, it is returned
-		 * to the sender of the call.
-		 */
-		private Object handleRpcInvocationSync(final String methodName,
-			final Class<?>[] parameterTypes,
-			final Object[] args,
-			final Time futureTimeout) throws Exception {
-			final Method rpcMethod = lookupRpcMethod(methodName, parameterTypes);
-			Object result = rpcMethod.invoke(rpcEndpoint, args);
-
-			if (result instanceof Future) {
-				Future<?> future = (Future<?>) result;
-				return future.get(futureTimeout.getSize(), futureTimeout.getUnit());
-			} else {
-				return result;
-			}
-		}
-
-		@Override
-		public void runAsync(Runnable runnable) {
-			runnable.run();
-		}
-
-		@Override
-		public <V> CompletableFuture<V> callAsync(Callable<V> callable, Time callTimeout) {
-			try {
-				return CompletableFuture.completedFuture(callable.call());
-			} catch (Throwable e) {
-				return FutureUtils.completedExceptionally(e);
-			}
-		}
-
-		@Override
-		public void scheduleRunAsync(final Runnable runnable, final long delay) {
-			try {
-				TimeUnit.MILLISECONDS.sleep(delay);
-				runnable.run();
-			} catch (Throwable e) {
-				throw new RuntimeException(e);
-			}
-		}
-
-		@Override
-		public String getAddress() {
-			return address;
-		}
-
-		// this is not a real hostname but the address above is also not a real akka RPC address
-		// and we keep it that way until actually needed by a test case
-		@Override
-		public String getHostname() {
-			return address;
-		}
-
-		@Override
-		public void start() {
-			// do nothing
-		}
-
-		@Override
-		public void stop() {
-			// do nothing
-		}
-
-		/**
-		 * Look up the rpc method on the given {@link RpcEndpoint} instance.
-		 *
-		 * @param methodName     Name of the method
-		 * @param parameterTypes Parameter types of the method
-		 * @return Method of the rpc endpoint
-		 * @throws NoSuchMethodException Thrown if the method with the given name and parameter types
-		 *                               cannot be found at the rpc endpoint
-		 */
-		private Method lookupRpcMethod(final String methodName,
-			final Class<?>[] parameterTypes) throws NoSuchMethodException {
-			return rpcEndpoint.getClass().getMethod(methodName, parameterTypes);
-		}
-
-		// ------------------------------------------------------------------------
-		//  Helper methods
-		// ------------------------------------------------------------------------
-
-		/**
-		 * Extracts the {@link RpcTimeout} annotated rpc timeout value from the list of given method
-		 * arguments. If no {@link RpcTimeout} annotated parameter could be found, then the default
-		 * timeout is returned.
-		 *
-		 * @param parameterAnnotations Parameter annotations
-		 * @param args                 Array of arguments
-		 * @param defaultTimeout       Default timeout to return if no {@link RpcTimeout} annotated parameter
-		 *                             has been found
-		 * @return Timeout extracted from the array of arguments or the default timeout
-		 */
-		private static Time extractRpcTimeout(Annotation[][] parameterAnnotations, Object[] args,
-			Time defaultTimeout) {
-			if (args != null) {
-				Preconditions.checkArgument(parameterAnnotations.length == args.length);
-
-				for (int i = 0; i < parameterAnnotations.length; i++) {
-					if (isRpcTimeout(parameterAnnotations[i])) {
-						if (args[i] instanceof Time) {
-							return (Time) args[i];
-						} else {
-							throw new RuntimeException("The rpc timeout parameter must be of type " +
-								Time.class.getName() + ". The type " + args[i].getClass().getName() +
-								" is not supported.");
-						}
-					}
-				}
-			}
-
-			return defaultTimeout;
-		}
-
-		/**
-		 * Checks whether any of the annotations is of type {@link RpcTimeout}
-		 *
-		 * @param annotations Array of annotations
-		 * @return True if {@link RpcTimeout} was found; otherwise false
-		 */
-		private static boolean isRpcTimeout(Annotation[] annotations) {
-			for (Annotation annotation : annotations) {
-				if (annotation.annotationType().equals(RpcTimeout.class)) {
-					return true;
-				}
-			}
-
-			return false;
-		}
-
-	}
-
-	private static class DoneScheduledFuture<V> implements ScheduledFuture<V> {
-
-		private final V value;
-
-		private DoneScheduledFuture(V value) {
-			this.value = value;
-		}
-
-		@Override
-		public long getDelay(TimeUnit unit) {
-			return 0L;
-		}
-
-		@Override
-		public int compareTo(Delayed o) {
-			return 0;
-		}
-
-		@Override
-		public boolean cancel(boolean mayInterruptIfRunning) {
-			return false;
-		}
-
-		@Override
-		public boolean isCancelled() {
-			return false;
-		}
-
-		@Override
-		public boolean isDone() {
-			return true;
-		}
-
-		@Override
-		public V get() throws InterruptedException, ExecutionException {
-			return value;
-		}
-
-		@Override
-		public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
-			return value;
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/3c84f8b9/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
index 4c87671..90f731d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.registration.RegistrationResponse;
@@ -47,7 +48,7 @@ import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
 import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
-import org.apache.flink.runtime.rpc.TestingSerialRpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
 import org.apache.flink.runtime.taskexecutor.slot.TimerService;
@@ -57,6 +58,7 @@ import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.TestLogger;
 import org.hamcrest.Matchers;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import java.net.InetAddress;
 import java.util.Arrays;
@@ -76,6 +78,8 @@ import static org.mockito.Mockito.when;
 
 public class TaskExecutorITCase extends TestLogger {
 
+	private final Time timeout = Time.seconds(10L);
+
 	@Test
 	public void testSlotAllocation() throws Exception {
 		TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
@@ -98,7 +102,7 @@ public class TaskExecutorITCase extends TestLogger {
 		testingHAServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService);
 		testingHAServices.setJobMasterLeaderRetriever(jobId, new TestingLeaderRetrievalService(jmAddress, jmLeaderId));
 
-		TestingSerialRpcService rpcService = new TestingSerialRpcService();
+		TestingRpcService rpcService = new TestingRpcService();
 		ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration(
 			Time.milliseconds(500L),
 			Time.milliseconds(500L));
@@ -170,6 +174,7 @@ public class TaskExecutorITCase extends TestLogger {
 
 		rpcService.registerGateway(rmAddress, resourceManager.getSelfGateway(ResourceManagerGateway.class));
 		rpcService.registerGateway(jmAddress, jmGateway);
+		rpcService.registerGateway(taskExecutor.getAddress(), taskExecutor.getSelfGateway(TaskExecutorGateway.class));
 
 		final AllocationID allocationId = new AllocationID();
 		final SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile, jmAddress);
@@ -179,27 +184,31 @@ public class TaskExecutorITCase extends TestLogger {
 			resourceManager.start();
 			taskExecutor.start();
 
+			final ResourceManagerGateway rmGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class);
+
 			// notify the RM that it is the leader
 			rmLeaderElectionService.isLeader(rmLeaderId);
 
 			// notify the TM about the new RM leader
 			rmLeaderRetrievalService.notifyListener(rmAddress, rmLeaderId);
 
-			CompletableFuture<RegistrationResponse> registrationResponseFuture = resourceManager.registerJobManager(
+			CompletableFuture<RegistrationResponse> registrationResponseFuture = rmGateway.registerJobManager(
 				rmLeaderId,
 				jmLeaderId,
 				jmResourceId,
 				jmAddress,
 				jobId,
-				Time.milliseconds(0L));
+				timeout);
 
 			RegistrationResponse registrationResponse = registrationResponseFuture.get();
 
 			assertTrue(registrationResponse instanceof JobMasterRegistrationSuccess);
 
-			resourceManager.requestSlot(jmLeaderId, rmLeaderId, slotRequest, Time.milliseconds(0L));
+			CompletableFuture<Acknowledge> slotAck = rmGateway.requestSlot(jmLeaderId, rmLeaderId, slotRequest, timeout);
+
+			slotAck.get();
 
-			verify(jmGateway).offerSlots(
+			verify(jmGateway, Mockito.timeout(timeout.toMilliseconds())).offerSlots(
 				eq(taskManagerResourceId),
 				(Iterable<SlotOffer>)argThat(Matchers.contains(slotOffer)),
 				eq(jmLeaderId), any(Time.class));