You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/11/10 10:24:20 UTC

[2/3] flink git commit: [FLINK-6434] [tests] Harden and speed up SlotPoolRpcTest

[FLINK-6434] [tests] Harden and speed up SlotPoolRpcTest


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2f9eb519
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2f9eb519
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2f9eb519

Branch: refs/heads/master
Commit: 2f9eb5194acc2b8f29a6bf0c97a4230811a3db99
Parents: f748197
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Nov 9 16:37:16 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Nov 10 11:23:23 2017 +0100

----------------------------------------------------------------------
 .../apache/flink/runtime/instance/SlotPool.java |  75 +++--
 .../flink/runtime/instance/SlotPoolGateway.java |  22 +-
 .../flink/runtime/instance/SlotPoolRpcTest.java | 329 ++++++++++++-------
 3 files changed, 278 insertions(+), 148 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2f9eb519/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
index b033319..159df7b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -42,10 +42,13 @@ import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.util.clock.Clock;
 import org.apache.flink.runtime.util.clock.SystemClock;
+import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -279,23 +282,25 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 	}
 
 	@Override
-	public void cancelSlotAllocation(AllocationID allocationID) {
-		if (waitingForResourceManager.remove(allocationID) == null) {
+	public CompletableFuture<Acknowledge> cancelSlotAllocation(AllocationID allocationId) {
+		final PendingRequest pendingRequest = removePendingRequest(allocationId);
 
-			PendingRequest request = pendingRequests.remove(allocationID);
-			if (request != null) {
-				failPendingRequest(request, new CancellationException("Allocation " + allocationID + " cancelled"));
-			} else {
+		if (pendingRequest != null) {
+			failPendingRequest(pendingRequest, new CancellationException("Allocation " + allocationId + " cancelled."));
+		} else {
+			final Slot slot = allocatedSlots.get(allocationId);
 
-				Slot slot = allocatedSlots.get(allocationID);
-				if (slot != null) {
-					LOG.info("Return allocated slot {} by cancelling allocation {}.", slot, allocationID);
-					if (slot.markCancelled()) {
-						internalReturnAllocatedSlot(slot);
-					}
+			if (slot != null) {
+				LOG.info("Returning allocated slot {} because the corresponding allocation request {} was cancelled.", slot, allocationId);
+				if (slot.markCancelled()) {
+					internalReturnAllocatedSlot(slot);
 				}
+			} else {
+				LOG.debug("There was no slot allocation with {} to be cancelled.", allocationId);
 			}
 		}
+
+		return CompletableFuture.completedFuture(Acknowledge.get());
 	}
 
 	CompletableFuture<SimpleSlot> internalAllocateSlot(
@@ -328,6 +333,28 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 		return future;
 	}
 
+	/**
+	 * Checks whether there exists a pending request with the given allocation id and removes it
+	 * from the internal data structures.
+	 *
+	 * @param allocationId identifying the pending request
+	 * @return pending request if there is one, otherwise null
+	 */
+	@Nullable
+	private PendingRequest removePendingRequest(AllocationID allocationId) {
+		PendingRequest result = waitingForResourceManager.remove(allocationId);
+
+		if (result != null) {
+			// sanity check
+			assert !pendingRequests.containsKey(allocationId) : "A pending requests should only be part of either " +
+				"the pendingRequests or waitingForResourceManager but not both.";
+
+			return result;
+		} else {
+			return pendingRequests.remove(allocationId);
+		}
+	}
+
 	private void requestSlotFromResourceManager(
 			final AllocationID allocationID,
 			final CompletableFuture<SimpleSlot> future,
@@ -396,6 +423,9 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 	}
 
 	private void failPendingRequest(PendingRequest pendingRequest, Exception e) {
+		Preconditions.checkNotNull(pendingRequest);
+		Preconditions.checkNotNull(e);
+
 		if (!pendingRequest.getFuture().isDone()) {
 			pendingRequest.getFuture().completeExceptionally(e);
 		}
@@ -422,8 +452,9 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 	private void checkTimeoutRequestWaitingForResourceManager(AllocationID allocationID) {
 		PendingRequest request = waitingForResourceManager.remove(allocationID);
 		if (request != null) {
-			failPendingRequest(request, new NoResourceAvailableException(
-					"No slot available and no connection to Resource Manager established."));
+			failPendingRequest(
+				request,
+				new NoResourceAvailableException("No slot available and no connection to Resource Manager established."));
 		}
 	}
 
@@ -632,10 +663,13 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 	 * Also it provides a way for us to keep "dead" or "abnormal" TaskManagers out of this pool.
 	 *
 	 * @param resourceID The id of the TaskManager
+	 * @return Future acknowledge if th operation was successful
 	 */
 	@Override
-	public void registerTaskManager(final ResourceID resourceID) {
+	public CompletableFuture<Acknowledge> registerTaskManager(final ResourceID resourceID) {
 		registeredTaskManagers.add(resourceID);
+
+		return CompletableFuture.completedFuture(Acknowledge.get());
 	}
 
 	/**
@@ -684,13 +718,14 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 		return availableSlots;
 	}
 
-	public CompletableFuture<Integer> getNumberOfWaitingForResourceRequests() {
-		return CompletableFuture.completedFuture(waitingForResourceManager.size());
+	@VisibleForTesting
+	public HashMap<AllocationID, PendingRequest> getPendingRequests() {
+		return pendingRequests;
 	}
 
-	@Override
-	public CompletableFuture<Integer> getNumberOfPendingRequests() {
-		return CompletableFuture.completedFuture(pendingRequests.size());
+	@VisibleForTesting
+	public HashMap<AllocationID, PendingRequest> getWaitingForResourceManager() {
+		return waitingForResourceManager;
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/2f9eb519/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
index 02d5d38..184072a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.instance;
 
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
@@ -72,7 +71,7 @@ public interface SlotPoolGateway extends RpcGateway {
 	//  registering / un-registering TaskManagers and slots
 	// ------------------------------------------------------------------------
 
-	void registerTaskManager(ResourceID resourceID);
+	CompletableFuture<Acknowledge> registerTaskManager(ResourceID resourceID);
 
 	CompletableFuture<Acknowledge> releaseTaskManager(ResourceID resourceID);
 
@@ -96,20 +95,11 @@ public interface SlotPoolGateway extends RpcGateway {
 	void returnAllocatedSlot(Slot slot);
 
 	/**
-	 * Cancel a slot allocation.
-	 * This method should be called when the CompletableFuture returned by allocateSlot completed exceptionally.
+	 * Cancel a slot allocation. This method should be called when the CompletableFuture returned by
+	 * allocateSlot completed exceptionally.
 	 *
-	 * @param allocationID the unique id for the previous allocation
+	 * @param allocationId identifying the slot allocation request
+	 * @return Future acknowledge if the slot allocation has been cancelled
 	 */
-	void cancelSlotAllocation(AllocationID allocationID);
-
-	// ------------------------------------------------------------------------
-	//  exposing internal statistic, mainly for testing
-	// ------------------------------------------------------------------------
-
-	@VisibleForTesting
-	CompletableFuture<Integer> getNumberOfWaitingForResourceRequests();
-
-	@VisibleForTesting
-	CompletableFuture<Integer> getNumberOfPendingRequests();
+	CompletableFuture<Acknowledge> cancelSlotAllocation(AllocationID allocationId);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2f9eb519/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
index b521b75..f81366a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.instance;
 
-import akka.actor.ActorSystem;
-import akka.pattern.AskTimeoutException;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
@@ -34,17 +32,23 @@ import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.util.clock.Clock;
 import org.apache.flink.runtime.util.clock.SystemClock;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorSystem;
+import akka.pattern.AskTimeoutException;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 import static org.apache.flink.runtime.instance.AvailableSlotsTest.DEFAULT_TESTING_PROFILE;
 import static org.junit.Assert.assertEquals;
@@ -60,6 +64,8 @@ public class SlotPoolRpcTest extends TestLogger {
 
 	private static RpcService rpcService;
 
+	private static final Time timeout = Time.seconds(10L);
+
 	// ------------------------------------------------------------------------
 	//  setup
 	// ------------------------------------------------------------------------
@@ -72,7 +78,10 @@ public class SlotPoolRpcTest extends TestLogger {
 
 	@AfterClass
 	public static  void shutdown() {
-		rpcService.stopService();
+		if (rpcService != null) {
+			rpcService.stopService();
+			rpcService = null;
+		}
 	}
 
 	// ------------------------------------------------------------------------
@@ -84,27 +93,32 @@ public class SlotPoolRpcTest extends TestLogger {
 		final JobID jid = new JobID();
 		
 		final SlotPool pool = new SlotPool(
-				rpcService, jid,
-				SystemClock.getInstance(),
-				Time.days(1), Time.days(1),
-				Time.milliseconds(100) // this is the timeout for the request tested here
+			rpcService,
+			jid,
+			SystemClock.getInstance(),
+			TestingUtils.infiniteTime(),
+			TestingUtils.infiniteTime(),
+			Time.milliseconds(10L) // this is the timeout for the request tested here
 		);
-		pool.start(JobMasterId.generate(), "foobar");
-
-		CompletableFuture<SimpleSlot> future = pool.allocateSlot(new AllocationID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, Time.days(1));
 
 		try {
-			future.get(4, TimeUnit.SECONDS);
-			fail("We expected a ExecutionException.");
-		}
-		catch (ExecutionException e) {
-			assertEquals(NoResourceAvailableException.class, e.getCause().getClass());
-		}
-		catch (TimeoutException e) {
-			fail("future timed out rather than being failed");
-		}
-		catch (Exception e) {
-			fail("wrong exception: " + e);
+			pool.start(JobMasterId.generate(), "foobar");
+
+			CompletableFuture<SimpleSlot> future = pool.allocateSlot(
+				new AllocationID(),
+				mock(ScheduledUnit.class),
+				DEFAULT_TESTING_PROFILE,
+				Collections.emptyList(),
+				TestingUtils.infiniteTime());
+
+			try {
+				future.get();
+				fail("We expected an ExecutionException.");
+			} catch (ExecutionException e) {
+				assertTrue(ExceptionUtils.stripExecutionException(e) instanceof NoResourceAvailableException);
+			}
+		} finally {
+			RpcUtils.terminateRpcEndpoint(pool, timeout);
 		}
 	}
 
@@ -112,101 +126,137 @@ public class SlotPoolRpcTest extends TestLogger {
 	public void testCancelSlotAllocationWithoutResourceManager() throws Exception {
 		final JobID jid = new JobID();
 
-		final SlotPool pool = new SlotPool(
-				rpcService, jid,
-				SystemClock.getInstance(),
-				Time.days(1), Time.days(1),
-				Time.seconds(1) // this is the timeout for the request tested here
-		);
-		pool.start(JobMasterId.generate(), "foobar");
-		SlotPoolGateway slotPoolGateway = pool.getSelfGateway(SlotPoolGateway.class);
-
-		AllocationID allocationID = new AllocationID();
-		CompletableFuture<SimpleSlot> future = slotPoolGateway.allocateSlot(allocationID, mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, Time.milliseconds(100));
+		final TestingSlotPool pool = new TestingSlotPool(
+			rpcService,
+			jid,
+			SystemClock.getInstance(),
+			TestingUtils.infiniteTime(),
+			TestingUtils.infiniteTime(),
+			TestingUtils.infiniteTime());
 
 		try {
-			future.get(500, TimeUnit.MILLISECONDS);
-			fail("We expected a AskTimeoutException.");
-		} catch (ExecutionException e) {
-			assertTrue(e.getCause() instanceof AskTimeoutException);
+			pool.start(JobMasterId.generate(), "foobar");
+			SlotPoolGateway slotPoolGateway = pool.getSelfGateway(SlotPoolGateway.class);
+
+			AllocationID allocationID = new AllocationID();
+			CompletableFuture<SimpleSlot> future = slotPoolGateway.allocateSlot(
+				allocationID,
+				mock(ScheduledUnit.class),
+				DEFAULT_TESTING_PROFILE,
+				Collections.emptyList(),
+				Time.milliseconds(10L));
+
+			try {
+				future.get();
+				fail("We expected a AskTimeoutException.");
+			} catch (ExecutionException e) {
+				assertTrue(ExceptionUtils.stripExecutionException(e) instanceof AskTimeoutException);
+			}
+
+			assertEquals(1L, (long) pool.getNumberOfWaitingForResourceRequests().get());
+
+			slotPoolGateway.cancelSlotAllocation(allocationID).get();
+
+			assertEquals(0L, (long) pool.getNumberOfWaitingForResourceRequests().get());
+		} finally {
+			RpcUtils.terminateRpcEndpoint(pool, timeout);
 		}
-
-		assertEquals(1, slotPoolGateway.getNumberOfWaitingForResourceRequests().get().intValue());
-
-		pool.cancelSlotAllocation(allocationID);
-		assertEquals(0, slotPoolGateway.getNumberOfWaitingForResourceRequests().get().intValue());
 	}
 
 	@Test
 	public void testCancelSlotAllocationWithResourceManager() throws Exception {
 		final JobID jid = new JobID();
 
-		final SlotPool pool = new SlotPool(
-				rpcService, jid,
-				SystemClock.getInstance(),
-				Time.days(1), Time.days(1),
-				Time.seconds(1) // this is the timeout for the request tested here
-		);
-		pool.start(JobMasterId.generate(), "foobar");
-		SlotPoolGateway slotPoolGateway = pool.getSelfGateway(SlotPoolGateway.class);
-
-		ResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
-		pool.connectToResourceManager(resourceManagerGateway);
-
-		AllocationID allocationID = new AllocationID();
-		CompletableFuture<SimpleSlot> future = slotPoolGateway.allocateSlot(allocationID, mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, Time.milliseconds(100));
+		final TestingSlotPool pool = new TestingSlotPool(
+			rpcService,
+			jid,
+			SystemClock.getInstance(),
+			TestingUtils.infiniteTime(),
+			TestingUtils.infiniteTime(),
+			TestingUtils.infiniteTime());
 
 		try {
-			future.get(500, TimeUnit.MILLISECONDS);
-			fail("We expected a AskTimeoutException.");
-		} catch (ExecutionException e) {
-			assertTrue(e.getCause() instanceof AskTimeoutException);
+			pool.start(JobMasterId.generate(), "foobar");
+			SlotPoolGateway slotPoolGateway = pool.getSelfGateway(SlotPoolGateway.class);
+
+			ResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
+			pool.connectToResourceManager(resourceManagerGateway);
+
+			AllocationID allocationID = new AllocationID();
+			CompletableFuture<SimpleSlot> future = slotPoolGateway.allocateSlot(
+				allocationID,
+				mock(ScheduledUnit.class),
+				DEFAULT_TESTING_PROFILE,
+				Collections.emptyList(),
+				Time.milliseconds(10L));
+
+			try {
+				future.get();
+				fail("We expected a AskTimeoutException.");
+			} catch (ExecutionException e) {
+				assertTrue(ExceptionUtils.stripExecutionException(e) instanceof AskTimeoutException);
+			}
+
+			assertEquals(1L, (long) pool.getNumberOfPendingRequests().get());
+
+			slotPoolGateway.cancelSlotAllocation(allocationID).get();
+			assertEquals(0L, (long) pool.getNumberOfPendingRequests().get());
+		} finally {
+			RpcUtils.terminateRpcEndpoint(pool, timeout);
 		}
-
-		assertEquals(1, slotPoolGateway.getNumberOfPendingRequests().get().intValue());
-
-		pool.cancelSlotAllocation(allocationID);
-		assertEquals(0, slotPoolGateway.getNumberOfPendingRequests().get().intValue());
 	}
 
 	@Test
 	public void testCancelSlotAllocationWhileSlotFulfilled() throws Exception {
 		final JobID jid = new JobID();
 
-		final SlotPool pool = new SlotPool(
-				rpcService, jid,
-				SystemClock.getInstance(),
-				Time.days(1), Time.days(1),
-				Time.seconds(1) // this is the timeout for the request tested here
-		);
-		pool.start(JobMasterId.generate(), "foobar");
-		SlotPoolGateway slotPoolGateway = pool.getSelfGateway(SlotPoolGateway.class);
+		final TestingSlotPool pool = new TestingSlotPool(
+			rpcService,
+			jid,
+			SystemClock.getInstance(),
+			TestingUtils.infiniteTime(),
+			TestingUtils.infiniteTime(),
+			TestingUtils.infiniteTime());
 
-		ResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
-		pool.connectToResourceManager(resourceManagerGateway);
+		try {
+			pool.start(JobMasterId.generate(), "foobar");
+			SlotPoolGateway slotPoolGateway = pool.getSelfGateway(SlotPoolGateway.class);
 
-		AllocationID allocationID = new AllocationID();
-		CompletableFuture<SimpleSlot> future = slotPoolGateway.allocateSlot(allocationID, mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, Time.milliseconds(100));
+			ResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
+			pool.connectToResourceManager(resourceManagerGateway);
 
-		try {
-			future.get(500, TimeUnit.MILLISECONDS);
-			fail("We expected a AskTimeoutException.");
-		}
-		catch (ExecutionException e) {
-			assertTrue(e.getCause() instanceof AskTimeoutException);
-		}
+			AllocationID allocationId = new AllocationID();
+			CompletableFuture<SimpleSlot> future = slotPoolGateway.allocateSlot(
+				allocationId,
+				mock(ScheduledUnit.class),
+				DEFAULT_TESTING_PROFILE,
+				Collections.emptyList(),
+				Time.milliseconds(10L));
+
+			try {
+				future.get();
+				fail("We expected a AskTimeoutException.");
+			} catch (ExecutionException e) {
+				assertTrue(ExceptionUtils.stripExecutionException(e) instanceof AskTimeoutException);
+			}
+
+			ResourceID resourceID = ResourceID.generate();
+			AllocatedSlot allocatedSlot = SlotPoolTest.createAllocatedSlot(resourceID, allocationId, jid, DEFAULT_TESTING_PROFILE);
+			slotPoolGateway.registerTaskManager(resourceID).get();
 
-		ResourceID resourceID = ResourceID.generate();
-		AllocatedSlot allocatedSlot = SlotPoolTest.createAllocatedSlot(resourceID, allocationID, jid, DEFAULT_TESTING_PROFILE);
-		slotPoolGateway.registerTaskManager(resourceID);
-		assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get());
+			assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get());
 
-		assertEquals(0, slotPoolGateway.getNumberOfPendingRequests().get().intValue());
-		assertTrue(pool.getAllocatedSlots().contains(allocationID));
+			assertEquals(0L, (long) pool.getNumberOfPendingRequests().get());
 
-		pool.cancelSlotAllocation(allocationID);
-		assertFalse(pool.getAllocatedSlots().contains(allocationID));
-		assertTrue(pool.getAvailableSlots().contains(allocationID));
+			assertTrue(pool.containsAllocatedSlot(allocationId).get());
+
+			pool.cancelSlotAllocation(allocationId).get();
+
+			assertFalse(pool.containsAllocatedSlot(allocationId).get());
+			assertTrue(pool.containsAvailableSlot(allocationId).get());
+		} finally {
+			RpcUtils.terminateRpcEndpoint(pool, timeout);
+		}
 	}
 
 	/**
@@ -217,29 +267,84 @@ public class SlotPoolRpcTest extends TestLogger {
 	public void testProviderAndOwner() throws Exception {
 		final JobID jid = new JobID();
 
-		final SlotPool pool = new SlotPool(
-				rpcService, jid,
-				SystemClock.getInstance(),
-				Time.milliseconds(100), Time.days(1),
-				Time.seconds(1) // this is the timeout for the request tested here
-		);
-		pool.start(JobMasterId.generate(), "foobar");
-		ResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
-		pool.connectToResourceManager(resourceManagerGateway);
+		final TestingSlotPool pool = new TestingSlotPool(
+			rpcService,
+			jid,
+			SystemClock.getInstance(),
+			Time.milliseconds(10L),
+			TestingUtils.infiniteTime(),
+			TestingUtils.infiniteTime());
 
-		ScheduledUnit mockScheduledUnit = new ScheduledUnit(SchedulerTestUtils.getDummyTask());
+		try {
+			pool.start(JobMasterId.generate(), "foobar");
+			ResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
+			pool.connectToResourceManager(resourceManagerGateway);
+
+			ScheduledUnit mockScheduledUnit = new ScheduledUnit(SchedulerTestUtils.getDummyTask());
+
+			// test the pending request is clear when timed out
+			CompletableFuture<SimpleSlot> future = pool.getSlotProvider().allocateSlot(
+				mockScheduledUnit,
+				true,
+				Collections.emptyList());
+
+			try {
+				future.get();
+				fail("We expected a AskTimeoutException.");
+			} catch (ExecutionException e) {
+				assertTrue(ExceptionUtils.stripExecutionException(e) instanceof AskTimeoutException);
+			}
+
+			assertEquals(0L, (long) pool.getNumberOfPendingRequests().get());
+		} finally {
+			RpcUtils.terminateRpcEndpoint(pool, timeout);
+		}
+	}
 
-		// test the pending request is clear when timed out
-		CompletableFuture<SimpleSlot> future = pool.getSlotProvider().allocateSlot(mockScheduledUnit, true, null);
+	/**
+	 * Testing SlotPool which exposes internal state via some testing methods.
+	 */
+	private static final class TestingSlotPool extends SlotPool {
+
+		public TestingSlotPool(
+				RpcService rpcService,
+				JobID jobId,
+				Clock clock,
+				Time slotRequestTimeout,
+				Time resourceManagerAllocationTimeout,
+				Time resourceManagerRequestTimeout) {
+			super(
+				rpcService,
+				jobId,
+				clock,
+				slotRequestTimeout,
+				resourceManagerAllocationTimeout,
+				resourceManagerRequestTimeout);
+		}
 
-		try {
-			future.get(500, TimeUnit.MILLISECONDS);
-			fail("We expected a AskTimeoutException.");
+		CompletableFuture<Boolean> containsAllocatedSlot(AllocationID allocationId) {
+			return callAsync(
+				() -> getAllocatedSlots().contains(allocationId),
+				timeout);
 		}
-		catch (ExecutionException e) {
-			assertTrue(e.getCause() instanceof AskTimeoutException);
+
+		CompletableFuture<Boolean> containsAvailableSlot(AllocationID allocationId) {
+			return callAsync(
+				() -> getAvailableSlots().contains(allocationId),
+				timeout);
+		}
+
+		CompletableFuture<Integer> getNumberOfPendingRequests() {
+			return callAsync(
+				() -> getPendingRequests().size(),
+				timeout);
 		}
 
-		assertEquals(0, pool.getSelfGateway(SlotPoolGateway.class).getNumberOfPendingRequests().get().intValue());
+		CompletableFuture<Integer> getNumberOfWaitingForResourceRequests() {
+			return callAsync(
+				() -> getWaitingForResourceManager().size(),
+				timeout);
+		}
 	}
+
 }