You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/11/10 10:24:20 UTC
[2/3] flink git commit: [FLINK-6434] [tests] Harden and speed up
SlotPoolRpcTest
[FLINK-6434] [tests] Harden and speed up SlotPoolRpcTest
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2f9eb519
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2f9eb519
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2f9eb519
Branch: refs/heads/master
Commit: 2f9eb5194acc2b8f29a6bf0c97a4230811a3db99
Parents: f748197
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Nov 9 16:37:16 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Nov 10 11:23:23 2017 +0100
----------------------------------------------------------------------
.../apache/flink/runtime/instance/SlotPool.java | 75 +++--
.../flink/runtime/instance/SlotPoolGateway.java | 22 +-
.../flink/runtime/instance/SlotPoolRpcTest.java | 329 ++++++++++++-------
3 files changed, 278 insertions(+), 148 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2f9eb519/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
index b033319..159df7b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -42,10 +42,13 @@ import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.util.clock.Clock;
import org.apache.flink.runtime.util.clock.SystemClock;
+import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -279,23 +282,25 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
}
@Override
- public void cancelSlotAllocation(AllocationID allocationID) {
- if (waitingForResourceManager.remove(allocationID) == null) {
+ public CompletableFuture<Acknowledge> cancelSlotAllocation(AllocationID allocationId) {
+ final PendingRequest pendingRequest = removePendingRequest(allocationId);
- PendingRequest request = pendingRequests.remove(allocationID);
- if (request != null) {
- failPendingRequest(request, new CancellationException("Allocation " + allocationID + " cancelled"));
- } else {
+ if (pendingRequest != null) {
+ failPendingRequest(pendingRequest, new CancellationException("Allocation " + allocationId + " cancelled."));
+ } else {
+ final Slot slot = allocatedSlots.get(allocationId);
- Slot slot = allocatedSlots.get(allocationID);
- if (slot != null) {
- LOG.info("Return allocated slot {} by cancelling allocation {}.", slot, allocationID);
- if (slot.markCancelled()) {
- internalReturnAllocatedSlot(slot);
- }
+ if (slot != null) {
+ LOG.info("Returning allocated slot {} because the corresponding allocation request {} was cancelled.", slot, allocationId);
+ if (slot.markCancelled()) {
+ internalReturnAllocatedSlot(slot);
}
+ } else {
+ LOG.debug("There was no slot allocation with {} to be cancelled.", allocationId);
}
}
+
+ return CompletableFuture.completedFuture(Acknowledge.get());
}
CompletableFuture<SimpleSlot> internalAllocateSlot(
@@ -328,6 +333,28 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
return future;
}
+ /**
+ * Checks whether there exists a pending request with the given allocation id and removes it
+ * from the internal data structures.
+ *
+ * @param allocationId identifying the pending request
+ * @return pending request if there is one, otherwise null
+ */
+ @Nullable
+ private PendingRequest removePendingRequest(AllocationID allocationId) {
+ PendingRequest result = waitingForResourceManager.remove(allocationId);
+
+ if (result != null) {
+ // sanity check
+ assert !pendingRequests.containsKey(allocationId) : "A pending requests should only be part of either " +
+ "the pendingRequests or waitingForResourceManager but not both.";
+
+ return result;
+ } else {
+ return pendingRequests.remove(allocationId);
+ }
+ }
+
private void requestSlotFromResourceManager(
final AllocationID allocationID,
final CompletableFuture<SimpleSlot> future,
@@ -396,6 +423,9 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
}
private void failPendingRequest(PendingRequest pendingRequest, Exception e) {
+ Preconditions.checkNotNull(pendingRequest);
+ Preconditions.checkNotNull(e);
+
if (!pendingRequest.getFuture().isDone()) {
pendingRequest.getFuture().completeExceptionally(e);
}
@@ -422,8 +452,9 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
private void checkTimeoutRequestWaitingForResourceManager(AllocationID allocationID) {
PendingRequest request = waitingForResourceManager.remove(allocationID);
if (request != null) {
- failPendingRequest(request, new NoResourceAvailableException(
- "No slot available and no connection to Resource Manager established."));
+ failPendingRequest(
+ request,
+ new NoResourceAvailableException("No slot available and no connection to Resource Manager established."));
}
}
@@ -632,10 +663,13 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
* Also it provides a way for us to keep "dead" or "abnormal" TaskManagers out of this pool.
*
* @param resourceID The id of the TaskManager
+ * @return Future acknowledge if th operation was successful
*/
@Override
- public void registerTaskManager(final ResourceID resourceID) {
+ public CompletableFuture<Acknowledge> registerTaskManager(final ResourceID resourceID) {
registeredTaskManagers.add(resourceID);
+
+ return CompletableFuture.completedFuture(Acknowledge.get());
}
/**
@@ -684,13 +718,14 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
return availableSlots;
}
- public CompletableFuture<Integer> getNumberOfWaitingForResourceRequests() {
- return CompletableFuture.completedFuture(waitingForResourceManager.size());
+ @VisibleForTesting
+ public HashMap<AllocationID, PendingRequest> getPendingRequests() {
+ return pendingRequests;
}
- @Override
- public CompletableFuture<Integer> getNumberOfPendingRequests() {
- return CompletableFuture.completedFuture(pendingRequests.size());
+ @VisibleForTesting
+ public HashMap<AllocationID, PendingRequest> getWaitingForResourceManager() {
+ return waitingForResourceManager;
}
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2f9eb519/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
index 02d5d38..184072a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.instance;
-import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
@@ -72,7 +71,7 @@ public interface SlotPoolGateway extends RpcGateway {
// registering / un-registering TaskManagers and slots
// ------------------------------------------------------------------------
- void registerTaskManager(ResourceID resourceID);
+ CompletableFuture<Acknowledge> registerTaskManager(ResourceID resourceID);
CompletableFuture<Acknowledge> releaseTaskManager(ResourceID resourceID);
@@ -96,20 +95,11 @@ public interface SlotPoolGateway extends RpcGateway {
void returnAllocatedSlot(Slot slot);
/**
- * Cancel a slot allocation.
- * This method should be called when the CompletableFuture returned by allocateSlot completed exceptionally.
+ * Cancel a slot allocation. This method should be called when the CompletableFuture returned by
+ * allocateSlot completed exceptionally.
*
- * @param allocationID the unique id for the previous allocation
+ * @param allocationId identifying the slot allocation request
+ * @return Future acknowledge if the slot allocation has been cancelled
*/
- void cancelSlotAllocation(AllocationID allocationID);
-
- // ------------------------------------------------------------------------
- // exposing internal statistic, mainly for testing
- // ------------------------------------------------------------------------
-
- @VisibleForTesting
- CompletableFuture<Integer> getNumberOfWaitingForResourceRequests();
-
- @VisibleForTesting
- CompletableFuture<Integer> getNumberOfPendingRequests();
+ CompletableFuture<Acknowledge> cancelSlotAllocation(AllocationID allocationId);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2f9eb519/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
index b521b75..f81366a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
@@ -18,8 +18,6 @@
package org.apache.flink.runtime.instance;
-import akka.actor.ActorSystem;
-import akka.pattern.AskTimeoutException;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
@@ -34,17 +32,23 @@ import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.util.clock.Clock;
import org.apache.flink.runtime.util.clock.SystemClock;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorSystem;
+import akka.pattern.AskTimeoutException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
+import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import static org.apache.flink.runtime.instance.AvailableSlotsTest.DEFAULT_TESTING_PROFILE;
import static org.junit.Assert.assertEquals;
@@ -60,6 +64,8 @@ public class SlotPoolRpcTest extends TestLogger {
private static RpcService rpcService;
+ private static final Time timeout = Time.seconds(10L);
+
// ------------------------------------------------------------------------
// setup
// ------------------------------------------------------------------------
@@ -72,7 +78,10 @@ public class SlotPoolRpcTest extends TestLogger {
@AfterClass
public static void shutdown() {
- rpcService.stopService();
+ if (rpcService != null) {
+ rpcService.stopService();
+ rpcService = null;
+ }
}
// ------------------------------------------------------------------------
@@ -84,27 +93,32 @@ public class SlotPoolRpcTest extends TestLogger {
final JobID jid = new JobID();
final SlotPool pool = new SlotPool(
- rpcService, jid,
- SystemClock.getInstance(),
- Time.days(1), Time.days(1),
- Time.milliseconds(100) // this is the timeout for the request tested here
+ rpcService,
+ jid,
+ SystemClock.getInstance(),
+ TestingUtils.infiniteTime(),
+ TestingUtils.infiniteTime(),
+ Time.milliseconds(10L) // this is the timeout for the request tested here
);
- pool.start(JobMasterId.generate(), "foobar");
-
- CompletableFuture<SimpleSlot> future = pool.allocateSlot(new AllocationID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, Time.days(1));
try {
- future.get(4, TimeUnit.SECONDS);
- fail("We expected a ExecutionException.");
- }
- catch (ExecutionException e) {
- assertEquals(NoResourceAvailableException.class, e.getCause().getClass());
- }
- catch (TimeoutException e) {
- fail("future timed out rather than being failed");
- }
- catch (Exception e) {
- fail("wrong exception: " + e);
+ pool.start(JobMasterId.generate(), "foobar");
+
+ CompletableFuture<SimpleSlot> future = pool.allocateSlot(
+ new AllocationID(),
+ mock(ScheduledUnit.class),
+ DEFAULT_TESTING_PROFILE,
+ Collections.emptyList(),
+ TestingUtils.infiniteTime());
+
+ try {
+ future.get();
+ fail("We expected an ExecutionException.");
+ } catch (ExecutionException e) {
+ assertTrue(ExceptionUtils.stripExecutionException(e) instanceof NoResourceAvailableException);
+ }
+ } finally {
+ RpcUtils.terminateRpcEndpoint(pool, timeout);
}
}
@@ -112,101 +126,137 @@ public class SlotPoolRpcTest extends TestLogger {
public void testCancelSlotAllocationWithoutResourceManager() throws Exception {
final JobID jid = new JobID();
- final SlotPool pool = new SlotPool(
- rpcService, jid,
- SystemClock.getInstance(),
- Time.days(1), Time.days(1),
- Time.seconds(1) // this is the timeout for the request tested here
- );
- pool.start(JobMasterId.generate(), "foobar");
- SlotPoolGateway slotPoolGateway = pool.getSelfGateway(SlotPoolGateway.class);
-
- AllocationID allocationID = new AllocationID();
- CompletableFuture<SimpleSlot> future = slotPoolGateway.allocateSlot(allocationID, mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, Time.milliseconds(100));
+ final TestingSlotPool pool = new TestingSlotPool(
+ rpcService,
+ jid,
+ SystemClock.getInstance(),
+ TestingUtils.infiniteTime(),
+ TestingUtils.infiniteTime(),
+ TestingUtils.infiniteTime());
try {
- future.get(500, TimeUnit.MILLISECONDS);
- fail("We expected a AskTimeoutException.");
- } catch (ExecutionException e) {
- assertTrue(e.getCause() instanceof AskTimeoutException);
+ pool.start(JobMasterId.generate(), "foobar");
+ SlotPoolGateway slotPoolGateway = pool.getSelfGateway(SlotPoolGateway.class);
+
+ AllocationID allocationID = new AllocationID();
+ CompletableFuture<SimpleSlot> future = slotPoolGateway.allocateSlot(
+ allocationID,
+ mock(ScheduledUnit.class),
+ DEFAULT_TESTING_PROFILE,
+ Collections.emptyList(),
+ Time.milliseconds(10L));
+
+ try {
+ future.get();
+ fail("We expected a AskTimeoutException.");
+ } catch (ExecutionException e) {
+ assertTrue(ExceptionUtils.stripExecutionException(e) instanceof AskTimeoutException);
+ }
+
+ assertEquals(1L, (long) pool.getNumberOfWaitingForResourceRequests().get());
+
+ slotPoolGateway.cancelSlotAllocation(allocationID).get();
+
+ assertEquals(0L, (long) pool.getNumberOfWaitingForResourceRequests().get());
+ } finally {
+ RpcUtils.terminateRpcEndpoint(pool, timeout);
}
-
- assertEquals(1, slotPoolGateway.getNumberOfWaitingForResourceRequests().get().intValue());
-
- pool.cancelSlotAllocation(allocationID);
- assertEquals(0, slotPoolGateway.getNumberOfWaitingForResourceRequests().get().intValue());
}
@Test
public void testCancelSlotAllocationWithResourceManager() throws Exception {
final JobID jid = new JobID();
- final SlotPool pool = new SlotPool(
- rpcService, jid,
- SystemClock.getInstance(),
- Time.days(1), Time.days(1),
- Time.seconds(1) // this is the timeout for the request tested here
- );
- pool.start(JobMasterId.generate(), "foobar");
- SlotPoolGateway slotPoolGateway = pool.getSelfGateway(SlotPoolGateway.class);
-
- ResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
- pool.connectToResourceManager(resourceManagerGateway);
-
- AllocationID allocationID = new AllocationID();
- CompletableFuture<SimpleSlot> future = slotPoolGateway.allocateSlot(allocationID, mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, Time.milliseconds(100));
+ final TestingSlotPool pool = new TestingSlotPool(
+ rpcService,
+ jid,
+ SystemClock.getInstance(),
+ TestingUtils.infiniteTime(),
+ TestingUtils.infiniteTime(),
+ TestingUtils.infiniteTime());
try {
- future.get(500, TimeUnit.MILLISECONDS);
- fail("We expected a AskTimeoutException.");
- } catch (ExecutionException e) {
- assertTrue(e.getCause() instanceof AskTimeoutException);
+ pool.start(JobMasterId.generate(), "foobar");
+ SlotPoolGateway slotPoolGateway = pool.getSelfGateway(SlotPoolGateway.class);
+
+ ResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
+ pool.connectToResourceManager(resourceManagerGateway);
+
+ AllocationID allocationID = new AllocationID();
+ CompletableFuture<SimpleSlot> future = slotPoolGateway.allocateSlot(
+ allocationID,
+ mock(ScheduledUnit.class),
+ DEFAULT_TESTING_PROFILE,
+ Collections.emptyList(),
+ Time.milliseconds(10L));
+
+ try {
+ future.get();
+ fail("We expected a AskTimeoutException.");
+ } catch (ExecutionException e) {
+ assertTrue(ExceptionUtils.stripExecutionException(e) instanceof AskTimeoutException);
+ }
+
+ assertEquals(1L, (long) pool.getNumberOfPendingRequests().get());
+
+ slotPoolGateway.cancelSlotAllocation(allocationID).get();
+ assertEquals(0L, (long) pool.getNumberOfPendingRequests().get());
+ } finally {
+ RpcUtils.terminateRpcEndpoint(pool, timeout);
}
-
- assertEquals(1, slotPoolGateway.getNumberOfPendingRequests().get().intValue());
-
- pool.cancelSlotAllocation(allocationID);
- assertEquals(0, slotPoolGateway.getNumberOfPendingRequests().get().intValue());
}
@Test
public void testCancelSlotAllocationWhileSlotFulfilled() throws Exception {
final JobID jid = new JobID();
- final SlotPool pool = new SlotPool(
- rpcService, jid,
- SystemClock.getInstance(),
- Time.days(1), Time.days(1),
- Time.seconds(1) // this is the timeout for the request tested here
- );
- pool.start(JobMasterId.generate(), "foobar");
- SlotPoolGateway slotPoolGateway = pool.getSelfGateway(SlotPoolGateway.class);
+ final TestingSlotPool pool = new TestingSlotPool(
+ rpcService,
+ jid,
+ SystemClock.getInstance(),
+ TestingUtils.infiniteTime(),
+ TestingUtils.infiniteTime(),
+ TestingUtils.infiniteTime());
- ResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
- pool.connectToResourceManager(resourceManagerGateway);
+ try {
+ pool.start(JobMasterId.generate(), "foobar");
+ SlotPoolGateway slotPoolGateway = pool.getSelfGateway(SlotPoolGateway.class);
- AllocationID allocationID = new AllocationID();
- CompletableFuture<SimpleSlot> future = slotPoolGateway.allocateSlot(allocationID, mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, Time.milliseconds(100));
+ ResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
+ pool.connectToResourceManager(resourceManagerGateway);
- try {
- future.get(500, TimeUnit.MILLISECONDS);
- fail("We expected a AskTimeoutException.");
- }
- catch (ExecutionException e) {
- assertTrue(e.getCause() instanceof AskTimeoutException);
- }
+ AllocationID allocationId = new AllocationID();
+ CompletableFuture<SimpleSlot> future = slotPoolGateway.allocateSlot(
+ allocationId,
+ mock(ScheduledUnit.class),
+ DEFAULT_TESTING_PROFILE,
+ Collections.emptyList(),
+ Time.milliseconds(10L));
+
+ try {
+ future.get();
+ fail("We expected a AskTimeoutException.");
+ } catch (ExecutionException e) {
+ assertTrue(ExceptionUtils.stripExecutionException(e) instanceof AskTimeoutException);
+ }
+
+ ResourceID resourceID = ResourceID.generate();
+ AllocatedSlot allocatedSlot = SlotPoolTest.createAllocatedSlot(resourceID, allocationId, jid, DEFAULT_TESTING_PROFILE);
+ slotPoolGateway.registerTaskManager(resourceID).get();
- ResourceID resourceID = ResourceID.generate();
- AllocatedSlot allocatedSlot = SlotPoolTest.createAllocatedSlot(resourceID, allocationID, jid, DEFAULT_TESTING_PROFILE);
- slotPoolGateway.registerTaskManager(resourceID);
- assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get());
+ assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get());
- assertEquals(0, slotPoolGateway.getNumberOfPendingRequests().get().intValue());
- assertTrue(pool.getAllocatedSlots().contains(allocationID));
+ assertEquals(0L, (long) pool.getNumberOfPendingRequests().get());
- pool.cancelSlotAllocation(allocationID);
- assertFalse(pool.getAllocatedSlots().contains(allocationID));
- assertTrue(pool.getAvailableSlots().contains(allocationID));
+ assertTrue(pool.containsAllocatedSlot(allocationId).get());
+
+ pool.cancelSlotAllocation(allocationId).get();
+
+ assertFalse(pool.containsAllocatedSlot(allocationId).get());
+ assertTrue(pool.containsAvailableSlot(allocationId).get());
+ } finally {
+ RpcUtils.terminateRpcEndpoint(pool, timeout);
+ }
}
/**
@@ -217,29 +267,84 @@ public class SlotPoolRpcTest extends TestLogger {
public void testProviderAndOwner() throws Exception {
final JobID jid = new JobID();
- final SlotPool pool = new SlotPool(
- rpcService, jid,
- SystemClock.getInstance(),
- Time.milliseconds(100), Time.days(1),
- Time.seconds(1) // this is the timeout for the request tested here
- );
- pool.start(JobMasterId.generate(), "foobar");
- ResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
- pool.connectToResourceManager(resourceManagerGateway);
+ final TestingSlotPool pool = new TestingSlotPool(
+ rpcService,
+ jid,
+ SystemClock.getInstance(),
+ Time.milliseconds(10L),
+ TestingUtils.infiniteTime(),
+ TestingUtils.infiniteTime());
- ScheduledUnit mockScheduledUnit = new ScheduledUnit(SchedulerTestUtils.getDummyTask());
+ try {
+ pool.start(JobMasterId.generate(), "foobar");
+ ResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
+ pool.connectToResourceManager(resourceManagerGateway);
+
+ ScheduledUnit mockScheduledUnit = new ScheduledUnit(SchedulerTestUtils.getDummyTask());
+
+ // test the pending request is clear when timed out
+ CompletableFuture<SimpleSlot> future = pool.getSlotProvider().allocateSlot(
+ mockScheduledUnit,
+ true,
+ Collections.emptyList());
+
+ try {
+ future.get();
+ fail("We expected a AskTimeoutException.");
+ } catch (ExecutionException e) {
+ assertTrue(ExceptionUtils.stripExecutionException(e) instanceof AskTimeoutException);
+ }
+
+ assertEquals(0L, (long) pool.getNumberOfPendingRequests().get());
+ } finally {
+ RpcUtils.terminateRpcEndpoint(pool, timeout);
+ }
+ }
- // test the pending request is clear when timed out
- CompletableFuture<SimpleSlot> future = pool.getSlotProvider().allocateSlot(mockScheduledUnit, true, null);
+ /**
+ * Testing SlotPool which exposes internal state via some testing methods.
+ */
+ private static final class TestingSlotPool extends SlotPool {
+
+ public TestingSlotPool(
+ RpcService rpcService,
+ JobID jobId,
+ Clock clock,
+ Time slotRequestTimeout,
+ Time resourceManagerAllocationTimeout,
+ Time resourceManagerRequestTimeout) {
+ super(
+ rpcService,
+ jobId,
+ clock,
+ slotRequestTimeout,
+ resourceManagerAllocationTimeout,
+ resourceManagerRequestTimeout);
+ }
- try {
- future.get(500, TimeUnit.MILLISECONDS);
- fail("We expected a AskTimeoutException.");
+ CompletableFuture<Boolean> containsAllocatedSlot(AllocationID allocationId) {
+ return callAsync(
+ () -> getAllocatedSlots().contains(allocationId),
+ timeout);
}
- catch (ExecutionException e) {
- assertTrue(e.getCause() instanceof AskTimeoutException);
+
+ CompletableFuture<Boolean> containsAvailableSlot(AllocationID allocationId) {
+ return callAsync(
+ () -> getAvailableSlots().contains(allocationId),
+ timeout);
+ }
+
+ CompletableFuture<Integer> getNumberOfPendingRequests() {
+ return callAsync(
+ () -> getPendingRequests().size(),
+ timeout);
}
- assertEquals(0, pool.getSelfGateway(SlotPoolGateway.class).getNumberOfPendingRequests().get().intValue());
+ CompletableFuture<Integer> getNumberOfWaitingForResourceRequests() {
+ return callAsync(
+ () -> getWaitingForResourceManager().size(),
+ timeout);
+ }
}
+
}