You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/12/14 16:42:23 UTC
[06/11] flink git commit: [FLINK-7956] [flip6] Add support for queued
scheduling with slot sharing to SlotPool
http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
deleted file mode 100644
index 60e1d34..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
+++ /dev/null
@@ -1,397 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.instance;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
-import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
-import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
-import org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils;
-import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
-import org.apache.flink.runtime.jobmaster.JobMasterId;
-import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
-import org.apache.flink.runtime.resourcemanager.SlotRequest;
-import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
-import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.RpcUtils;
-import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
-import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
-import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.util.clock.Clock;
-import org.apache.flink.runtime.util.clock.SystemClock;
-import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.TestLogger;
-
-import akka.actor.ActorSystem;
-import akka.pattern.AskTimeoutException;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.util.Collections;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.function.Consumer;
-
-import static org.apache.flink.runtime.instance.AvailableSlotsTest.DEFAULT_TESTING_PROFILE;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * Tests for the SlotPool using a proper RPC setup.
- */
-public class SlotPoolRpcTest extends TestLogger {
-
- private static RpcService rpcService;
-
- private static final Time timeout = Time.seconds(10L);
-
- // ------------------------------------------------------------------------
- // setup
- // ------------------------------------------------------------------------
-
- @BeforeClass
- public static void setup() {
- ActorSystem actorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
- rpcService = new AkkaRpcService(actorSystem, Time.seconds(10));
- }
-
- @AfterClass
- public static void shutdown() {
- if (rpcService != null) {
- rpcService.stopService();
- rpcService = null;
- }
- }
-
- // ------------------------------------------------------------------------
- // tests
- // ------------------------------------------------------------------------
-
- @Test
- public void testSlotAllocationNoResourceManager() throws Exception {
- final JobID jid = new JobID();
-
- final SlotPool pool = new SlotPool(
- rpcService,
- jid,
- SystemClock.getInstance(),
- TestingUtils.infiniteTime(),
- TestingUtils.infiniteTime(),
- Time.milliseconds(10L) // this is the timeout for the request tested here
- );
-
- try {
- pool.start(JobMasterId.generate(), "foobar");
-
- CompletableFuture<LogicalSlot> future = pool.allocateSlot(
- new SlotRequestID(),
- new ScheduledUnit(SchedulerTestUtils.getDummyTask()),
- DEFAULT_TESTING_PROFILE,
- Collections.emptyList(),
- TestingUtils.infiniteTime());
-
- try {
- future.get();
- fail("We expected an ExecutionException.");
- } catch (ExecutionException e) {
- assertTrue(ExceptionUtils.stripExecutionException(e) instanceof NoResourceAvailableException);
- }
- } finally {
- RpcUtils.terminateRpcEndpoint(pool, timeout);
- }
- }
-
- @Test
- public void testCancelSlotAllocationWithoutResourceManager() throws Exception {
- final JobID jid = new JobID();
-
- final TestingSlotPool pool = new TestingSlotPool(
- rpcService,
- jid,
- SystemClock.getInstance(),
- TestingUtils.infiniteTime(),
- TestingUtils.infiniteTime(),
- TestingUtils.infiniteTime());
-
- try {
- pool.start(JobMasterId.generate(), "foobar");
- SlotPoolGateway slotPoolGateway = pool.getSelfGateway(SlotPoolGateway.class);
-
- SlotRequestID requestId = new SlotRequestID();
- CompletableFuture<LogicalSlot> future = slotPoolGateway.allocateSlot(
- requestId,
- new ScheduledUnit(SchedulerTestUtils.getDummyTask()),
- DEFAULT_TESTING_PROFILE,
- Collections.emptyList(),
- Time.milliseconds(10L));
-
- try {
- future.get();
- fail("We expected a AskTimeoutException.");
- } catch (ExecutionException e) {
- assertTrue(ExceptionUtils.stripExecutionException(e) instanceof AskTimeoutException);
- }
-
- assertEquals(1L, (long) pool.getNumberOfWaitingForResourceRequests().get());
-
- slotPoolGateway.cancelSlotRequest(requestId).get();
-
- assertEquals(0L, (long) pool.getNumberOfWaitingForResourceRequests().get());
- } finally {
- RpcUtils.terminateRpcEndpoint(pool, timeout);
- }
- }
-
- @Test
- public void testCancelSlotAllocationWithResourceManager() throws Exception {
- final JobID jid = new JobID();
-
- final TestingSlotPool pool = new TestingSlotPool(
- rpcService,
- jid,
- SystemClock.getInstance(),
- TestingUtils.infiniteTime(),
- TestingUtils.infiniteTime(),
- TestingUtils.infiniteTime());
-
- try {
- pool.start(JobMasterId.generate(), "foobar");
- SlotPoolGateway slotPoolGateway = pool.getSelfGateway(SlotPoolGateway.class);
-
- ResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
- pool.connectToResourceManager(resourceManagerGateway);
-
- SlotRequestID requestId = new SlotRequestID();
- CompletableFuture<LogicalSlot> future = slotPoolGateway.allocateSlot(
- requestId,
- new ScheduledUnit(SchedulerTestUtils.getDummyTask()),
- DEFAULT_TESTING_PROFILE,
- Collections.emptyList(),
- Time.milliseconds(10L));
-
- try {
- future.get();
- fail("We expected a AskTimeoutException.");
- } catch (ExecutionException e) {
- assertTrue(ExceptionUtils.stripExecutionException(e) instanceof AskTimeoutException);
- }
-
- assertEquals(1L, (long) pool.getNumberOfPendingRequests().get());
-
- slotPoolGateway.cancelSlotRequest(requestId).get();
- assertEquals(0L, (long) pool.getNumberOfPendingRequests().get());
- } finally {
- RpcUtils.terminateRpcEndpoint(pool, timeout);
- }
- }
-
- /**
- * Tests that allocated slots are not cancelled.
- */
- @Test
- public void testCancelSlotAllocationWhileSlotFulfilled() throws Exception {
- final JobID jid = new JobID();
-
- final TestingSlotPool pool = new TestingSlotPool(
- rpcService,
- jid,
- SystemClock.getInstance(),
- TestingUtils.infiniteTime(),
- TestingUtils.infiniteTime(),
- TestingUtils.infiniteTime());
-
- try {
- pool.start(JobMasterId.generate(), "foobar");
- SlotPoolGateway slotPoolGateway = pool.getSelfGateway(SlotPoolGateway.class);
-
- final CompletableFuture<AllocationID> allocationIdFuture = new CompletableFuture<>();
-
- TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
- resourceManagerGateway.setRequestSlotConsumer(
- (SlotRequest slotRequest) -> allocationIdFuture.complete(slotRequest.getAllocationId()));
-
- pool.connectToResourceManager(resourceManagerGateway);
-
- SlotRequestID requestId = new SlotRequestID();
- CompletableFuture<LogicalSlot> future = slotPoolGateway.allocateSlot(
- requestId,
- new ScheduledUnit(SchedulerTestUtils.getDummyTask()),
- DEFAULT_TESTING_PROFILE,
- Collections.emptyList(),
- Time.milliseconds(10L));
-
- try {
- future.get();
- fail("We expected a AskTimeoutException.");
- } catch (ExecutionException e) {
- assertTrue(ExceptionUtils.stripExecutionException(e) instanceof AskTimeoutException);
- }
-
- AllocationID allocationId = allocationIdFuture.get();
- final SlotOffer slotOffer = new SlotOffer(
- allocationId,
- 0,
- DEFAULT_TESTING_PROFILE);
- final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
- final TaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway();
-
- slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()).get();
-
- assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get());
-
- assertEquals(0L, (long) pool.getNumberOfPendingRequests().get());
-
- assertTrue(pool.containsAllocatedSlot(allocationId).get());
-
- pool.cancelSlotRequest(requestId).get();
-
- assertFalse(pool.containsAllocatedSlot(allocationId).get());
- assertTrue(pool.containsAvailableSlot(allocationId).get());
- } finally {
- RpcUtils.terminateRpcEndpoint(pool, timeout);
- }
- }
-
- /**
- * This case make sure when allocateSlot in ProviderAndOwner timeout,
- * it will automatically call cancelSlotAllocation as will inject future.whenComplete in ProviderAndOwner.
- */
- @Test
- public void testProviderAndOwner() throws Exception {
- final JobID jid = new JobID();
-
- final TestingSlotPool pool = new TestingSlotPool(
- rpcService,
- jid,
- SystemClock.getInstance(),
- Time.milliseconds(10L),
- TestingUtils.infiniteTime(),
- TestingUtils.infiniteTime());
-
- final CompletableFuture<SlotRequestID> cancelFuture = new CompletableFuture<>();
-
- pool.setCancelSlotAllocationConsumer(
- slotRequestID -> cancelFuture.complete(slotRequestID));
-
- try {
- pool.start(JobMasterId.generate(), "foobar");
- ResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
- pool.connectToResourceManager(resourceManagerGateway);
-
- ScheduledUnit mockScheduledUnit = new ScheduledUnit(SchedulerTestUtils.getDummyTask());
-
- // test the pending request is clear when timed out
- CompletableFuture<LogicalSlot> future = pool.getSlotProvider().allocateSlot(
- mockScheduledUnit,
- true,
- Collections.emptyList());
-
- try {
- future.get();
- fail("We expected a AskTimeoutException.");
- } catch (ExecutionException e) {
- assertTrue(ExceptionUtils.stripExecutionException(e) instanceof AskTimeoutException);
- }
-
- // wait for the cancel call on the SlotPool
- cancelFuture.get();
-
- assertEquals(0L, (long) pool.getNumberOfPendingRequests().get());
- } finally {
- RpcUtils.terminateRpcEndpoint(pool, timeout);
- }
- }
-
- /**
- * Testing SlotPool which exposes internal state via some testing methods.
- */
- private static final class TestingSlotPool extends SlotPool {
-
- private volatile Consumer<SlotRequestID> cancelSlotAllocationConsumer;
-
- public TestingSlotPool(
- RpcService rpcService,
- JobID jobId,
- Clock clock,
- Time slotRequestTimeout,
- Time resourceManagerAllocationTimeout,
- Time resourceManagerRequestTimeout) {
- super(
- rpcService,
- jobId,
- clock,
- slotRequestTimeout,
- resourceManagerAllocationTimeout,
- resourceManagerRequestTimeout);
-
- cancelSlotAllocationConsumer = null;
- }
-
- public void setCancelSlotAllocationConsumer(Consumer<SlotRequestID> cancelSlotAllocationConsumer) {
- this.cancelSlotAllocationConsumer = Preconditions.checkNotNull(cancelSlotAllocationConsumer);
- }
-
- @Override
- public CompletableFuture<Acknowledge> cancelSlotRequest(SlotRequestID slotRequestId) {
- final Consumer<SlotRequestID> currentCancelSlotAllocationConsumer = cancelSlotAllocationConsumer;
-
- if (currentCancelSlotAllocationConsumer != null) {
- currentCancelSlotAllocationConsumer.accept(slotRequestId);
- }
-
- return super.cancelSlotRequest(slotRequestId);
- }
-
- CompletableFuture<Boolean> containsAllocatedSlot(AllocationID allocationId) {
- return callAsync(
- () -> getAllocatedSlots().contains(allocationId),
- timeout);
- }
-
- CompletableFuture<Boolean> containsAvailableSlot(AllocationID allocationId) {
- return callAsync(
- () -> getAvailableSlots().contains(allocationId),
- timeout);
- }
-
- CompletableFuture<Integer> getNumberOfPendingRequests() {
- return callAsync(
- () -> getPendingRequests().size(),
- timeout);
- }
-
- CompletableFuture<Integer> getNumberOfWaitingForResourceRequests() {
- return callAsync(
- () -> getWaitingForResourceManager().size(),
- timeout);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
deleted file mode 100644
index 1af9cce..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
+++ /dev/null
@@ -1,479 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.instance;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.executiongraph.Execution;
-import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
-import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
-import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
-import org.apache.flink.runtime.jobmaster.JobMasterId;
-import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
-import org.apache.flink.runtime.resourcemanager.SlotRequest;
-import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
-import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.RpcUtils;
-import org.apache.flink.runtime.rpc.TestingRpcService;
-import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
-import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Mockito;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.flink.runtime.instance.AvailableSlotsTest.DEFAULT_TESTING_PROFILE;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.RETURNS_MOCKS;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-public class SlotPoolTest extends TestLogger {
-
- private static final Logger LOG = LoggerFactory.getLogger(SlotPoolTest.class);
-
- private final Time timeout = Time.seconds(10L);
-
- private RpcService rpcService;
-
- private JobID jobId;
-
- private TaskManagerLocation taskManagerLocation;
-
- private TaskManagerGateway taskManagerGateway;
-
- @Before
- public void setUp() throws Exception {
- this.rpcService = new TestingRpcService();
- this.jobId = new JobID();
-
- taskManagerLocation = new LocalTaskManagerLocation();
- taskManagerGateway = new SimpleAckingTaskManagerGateway();
- }
-
- @After
- public void tearDown() throws Exception {
- rpcService.stopService();
- }
-
- @Test
- public void testAllocateSimpleSlot() throws Exception {
- ResourceManagerGateway resourceManagerGateway = createResourceManagerGatewayMock();
- final SlotPool slotPool = new SlotPool(rpcService, jobId);
-
- try {
- SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway);
- slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID());
-
- SlotRequestID requestId = new SlotRequestID();
- CompletableFuture<LogicalSlot> future = slotPoolGateway.allocateSlot(requestId, mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
- assertFalse(future.isDone());
-
- ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
- verify(resourceManagerGateway, Mockito.timeout(timeout.toMilliseconds())).requestSlot(any(JobMasterId.class), slotRequestArgumentCaptor.capture(), any(Time.class));
-
- final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue();
-
- final SlotOffer slotOffer = new SlotOffer(
- slotRequest.getAllocationId(),
- 0,
- DEFAULT_TESTING_PROFILE);
-
- assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get());
-
- LogicalSlot slot = future.get(1, TimeUnit.SECONDS);
- assertTrue(future.isDone());
- assertTrue(slot.isAlive());
- assertEquals(taskManagerLocation, slot.getTaskManagerLocation());
- } finally {
- slotPool.shutDown();
- }
- }
-
- @Test
- public void testAllocationFulfilledByReturnedSlot() throws Exception {
- ResourceManagerGateway resourceManagerGateway = createResourceManagerGatewayMock();
- final SlotPool slotPool = new SlotPool(rpcService, jobId);
-
- try {
- SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway);
- slotPool.registerTaskManager(taskManagerLocation.getResourceID());
-
- CompletableFuture<LogicalSlot> future1 = slotPoolGateway.allocateSlot(new SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
- CompletableFuture<LogicalSlot> future2 = slotPoolGateway.allocateSlot(new SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
-
- assertFalse(future1.isDone());
- assertFalse(future2.isDone());
-
- ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
- verify(resourceManagerGateway, Mockito.timeout(timeout.toMilliseconds()).times(2))
- .requestSlot(any(JobMasterId.class), slotRequestArgumentCaptor.capture(), any(Time.class));
-
- final List<SlotRequest> slotRequests = slotRequestArgumentCaptor.getAllValues();
-
- final SlotOffer slotOffer = new SlotOffer(
- slotRequests.get(0).getAllocationId(),
- 0,
- DEFAULT_TESTING_PROFILE);
-
- assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get());
-
- LogicalSlot slot1 = future1.get(1, TimeUnit.SECONDS);
- assertTrue(future1.isDone());
- assertFalse(future2.isDone());
-
- // return this slot to pool
- slot1.releaseSlot();
-
- // second allocation fulfilled by previous slot returning
- LogicalSlot slot2 = future2.get(1, TimeUnit.SECONDS);
- assertTrue(future2.isDone());
-
- assertNotEquals(slot1, slot2);
- assertFalse(slot1.isAlive());
- assertTrue(slot2.isAlive());
- assertEquals(slot1.getTaskManagerLocation(), slot2.getTaskManagerLocation());
- assertEquals(slot1.getPhysicalSlotNumber(), slot2.getPhysicalSlotNumber());
- assertEquals(slot1.getAllocationId(), slot2.getAllocationId());
- } finally {
- slotPool.shutDown();
- }
- }
-
- @Test
- public void testAllocateWithFreeSlot() throws Exception {
- ResourceManagerGateway resourceManagerGateway = createResourceManagerGatewayMock();
- final SlotPool slotPool = new SlotPool(rpcService, jobId);
-
- try {
- SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway);
- slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID());
-
- CompletableFuture<LogicalSlot> future1 = slotPoolGateway.allocateSlot(new SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
- assertFalse(future1.isDone());
-
- ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
- verify(resourceManagerGateway, Mockito.timeout(timeout.toMilliseconds())).requestSlot(any(JobMasterId.class), slotRequestArgumentCaptor.capture(), any(Time.class));
-
- final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue();
-
- final SlotOffer slotOffer = new SlotOffer(
- slotRequest.getAllocationId(),
- 0,
- DEFAULT_TESTING_PROFILE);
-
- assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get());
-
- LogicalSlot slot1 = future1.get(1, TimeUnit.SECONDS);
- assertTrue(future1.isDone());
-
- // return this slot to pool
- slot1.releaseSlot();
-
- CompletableFuture<LogicalSlot> future2 = slotPoolGateway.allocateSlot(new SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
-
- // second allocation fulfilled by previous slot returning
- LogicalSlot slot2 = future2.get(1, TimeUnit.SECONDS);
- assertTrue(future2.isDone());
-
- assertNotEquals(slot1, slot2);
- assertFalse(slot1.isAlive());
- assertTrue(slot2.isAlive());
- assertEquals(slot1.getTaskManagerLocation(), slot2.getTaskManagerLocation());
- assertEquals(slot1.getPhysicalSlotNumber(), slot2.getPhysicalSlotNumber());
- } finally {
- slotPool.shutDown();
- }
- }
-
- @Test
- public void testOfferSlot() throws Exception {
- ResourceManagerGateway resourceManagerGateway = createResourceManagerGatewayMock();
- final SlotPool slotPool = new SlotPool(rpcService, jobId);
-
- try {
- SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway);
- slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID());
-
- CompletableFuture<LogicalSlot> future = slotPoolGateway.allocateSlot(new SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
- assertFalse(future.isDone());
-
- ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
- verify(resourceManagerGateway, Mockito.timeout(timeout.toMilliseconds())).requestSlot(any(JobMasterId.class), slotRequestArgumentCaptor.capture(), any(Time.class));
-
- final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue();
-
- final SlotOffer slotOffer = new SlotOffer(
- slotRequest.getAllocationId(),
- 0,
- DEFAULT_TESTING_PROFILE);
-
- final TaskManagerLocation invalidTaskManagerLocation = new LocalTaskManagerLocation();
-
- // slot from unregistered resource
- assertFalse(slotPoolGateway.offerSlot(invalidTaskManagerLocation, taskManagerGateway, slotOffer).get());
-
- final SlotOffer nonRequestedSlotOffer = new SlotOffer(
- new AllocationID(),
- 0,
- DEFAULT_TESTING_PROFILE);
-
- // we'll also accept non requested slots
- assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, nonRequestedSlotOffer).get());
-
- // accepted slot
- assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get());
- LogicalSlot slot = future.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
- assertTrue(slot.isAlive());
-
- // duplicated offer with using slot
- assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get());
- assertTrue(slot.isAlive());
-
- // duplicated offer with free slot
- slot.releaseSlot();
- assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get());
- } finally {
- slotPool.shutDown();
- }
- }
-
- @Test
- public void testReleaseResource() throws Exception {
- ResourceManagerGateway resourceManagerGateway = createResourceManagerGatewayMock();
-
- final CompletableFuture<Boolean> slotReturnFuture = new CompletableFuture<>();
-
- final SlotPool slotPool = new SlotPool(rpcService, jobId) {
- @Override
- public void returnAllocatedSlot(SlotRequestID slotRequestId) {
- super.returnAllocatedSlot(slotRequestId);
-
- slotReturnFuture.complete(true);
- }
- };
-
- try {
- SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway);
- slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID());
-
- CompletableFuture<LogicalSlot> future1 = slotPoolGateway.allocateSlot(new SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
-
- ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
- verify(resourceManagerGateway, Mockito.timeout(timeout.toMilliseconds())).requestSlot(any(JobMasterId.class), slotRequestArgumentCaptor.capture(), any(Time.class));
-
- final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue();
-
- CompletableFuture<LogicalSlot> future2 = slotPoolGateway.allocateSlot(new SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
-
- final SlotOffer slotOffer = new SlotOffer(
- slotRequest.getAllocationId(),
- 0,
- DEFAULT_TESTING_PROFILE);
-
- assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get());
-
- LogicalSlot slot1 = future1.get(1, TimeUnit.SECONDS);
- assertTrue(future1.isDone());
- assertFalse(future2.isDone());
-
- slotPoolGateway.releaseTaskManager(taskManagerLocation.getResourceID());
-
- // wait until the slot has been returned
- slotReturnFuture.get();
-
- assertFalse(slot1.isAlive());
-
- // slot released and not usable, second allocation still not fulfilled
- Thread.sleep(10);
- assertFalse(future2.isDone());
- } finally {
- slotPool.shutDown();
- }
- }
-
- /**
- * Tests that a slot request is cancelled if it failed with an exception (e.g. TimeoutException).
- *
- * <p>See FLINK-7870
- */
- @Test
- public void testSlotRequestCancellationUponFailingRequest() throws Exception {
- final SlotPool slotPool = new SlotPool(rpcService, jobId);
- final CompletableFuture<Acknowledge> requestSlotFuture = new CompletableFuture<>();
- final CompletableFuture<AllocationID> cancelSlotFuture = new CompletableFuture<>();
- final CompletableFuture<AllocationID> requestSlotFutureAllocationId = new CompletableFuture<>();
-
- final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
- resourceManagerGateway.setRequestSlotFuture(requestSlotFuture);
- resourceManagerGateway.setRequestSlotConsumer(slotRequest -> requestSlotFutureAllocationId.complete(slotRequest.getAllocationId()));
- resourceManagerGateway.setCancelSlotConsumer(allocationID -> cancelSlotFuture.complete(allocationID));
-
- final ScheduledUnit scheduledUnit = new ScheduledUnit(mock(Execution.class));
-
- try {
- slotPool.start(JobMasterId.generate(), "localhost");
-
- final SlotPoolGateway slotPoolGateway = slotPool.getSelfGateway(SlotPoolGateway.class);
-
- slotPoolGateway.connectToResourceManager(resourceManagerGateway);
-
- CompletableFuture<LogicalSlot> slotFuture = slotPoolGateway.allocateSlot(
- new SlotRequestID(),
- scheduledUnit,
- ResourceProfile.UNKNOWN,
- Collections.emptyList(),
- timeout);
-
- requestSlotFuture.completeExceptionally(new FlinkException("Testing exception."));
-
- try {
- slotFuture.get();
- fail("The slot future should not have been completed properly.");
- } catch (Exception ignored) {
- // expected
- }
-
- // check that a failure triggered the slot request cancellation
- // with the correct allocation id
- assertEquals(requestSlotFutureAllocationId.get(), cancelSlotFuture.get());
- } finally {
- try {
- RpcUtils.terminateRpcEndpoint(slotPool, timeout);
- } catch (Exception e) {
- LOG.warn("Could not properly terminate the SlotPool.", e);
- }
- }
- }
-
- /**
- * Tests that unused offered slots are directly used to fulfill pending slot
- * requests.
- *
- * <p>See FLINK-8089
- */
- @Test
- public void testFulfillingSlotRequestsWithUnusedOfferedSlots() throws Exception {
- final SlotPool slotPool = new SlotPool(rpcService, jobId);
-
- final JobMasterId jobMasterId = JobMasterId.generate();
- final String jobMasterAddress = "foobar";
- final CompletableFuture<AllocationID> allocationIdFuture = new CompletableFuture<>();
- final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
-
- resourceManagerGateway.setRequestSlotConsumer(
- (SlotRequest slotRequest) -> allocationIdFuture.complete(slotRequest.getAllocationId()));
-
- final SlotRequestID slotRequestId1 = new SlotRequestID();
- final SlotRequestID slotRequestId2 = new SlotRequestID();
-
- try {
- slotPool.start(jobMasterId, jobMasterAddress);
-
- final SlotPoolGateway slotPoolGateway = slotPool.getSelfGateway(SlotPoolGateway.class);
-
- final ScheduledUnit scheduledUnit = new ScheduledUnit(mock(Execution.class));
-
- slotPoolGateway.connectToResourceManager(resourceManagerGateway);
-
- CompletableFuture<LogicalSlot> slotFuture1 = slotPoolGateway.allocateSlot(
- slotRequestId1,
- scheduledUnit,
- ResourceProfile.UNKNOWN,
- Collections.emptyList(),
- timeout);
-
- // wait for the first slot request
- final AllocationID allocationId = allocationIdFuture.get();
-
- CompletableFuture<LogicalSlot> slotFuture2 = slotPoolGateway.allocateSlot(
- slotRequestId2,
- scheduledUnit,
- ResourceProfile.UNKNOWN,
- Collections.emptyList(),
- timeout);
-
- slotPoolGateway.cancelSlotRequest(slotRequestId1);
-
- try {
- // this should fail with a CancellationException
- slotFuture1.get();
- fail("The first slot future should have failed because it was cancelled.");
- } catch (ExecutionException ee) {
- assertTrue(ExceptionUtils.stripExecutionException(ee) instanceof CancellationException);
- }
-
- final SlotOffer slotOffer = new SlotOffer(allocationId, 0, ResourceProfile.UNKNOWN);
-
- slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()).get();
-
- assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get());
-
- // the slot offer should fulfill the second slot request
- assertEquals(allocationId, slotFuture2.get().getAllocationId());
- } finally {
- RpcUtils.terminateRpcEndpoint(slotPool, timeout);
- }
- }
-
- private static ResourceManagerGateway createResourceManagerGatewayMock() {
- ResourceManagerGateway resourceManagerGateway = mock(ResourceManagerGateway.class);
- when(resourceManagerGateway
- .requestSlot(any(JobMasterId.class), any(SlotRequest.class), any(Time.class)))
- .thenReturn(mock(CompletableFuture.class, RETURNS_MOCKS));
-
- return resourceManagerGateway;
- }
-
- private static SlotPoolGateway setupSlotPool(
- SlotPool slotPool,
- ResourceManagerGateway resourceManagerGateway) throws Exception {
- final String jobManagerAddress = "foobar";
-
- slotPool.start(JobMasterId.generate(), jobManagerAddress);
-
- slotPool.connectToResourceManager(resourceManagerGateway);
-
- return slotPool.getSelfGateway(SlotPoolGateway.class);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignmentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignmentTest.java
index 28cab72..2407c1d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignmentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignmentTest.java
@@ -21,8 +21,8 @@ package org.apache.flink.runtime.instance;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
-import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.TestLogger;
http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/test/java/org/apache/flink/runtime/instance/TestingLogicalSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/TestingLogicalSlot.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/TestingLogicalSlot.java
deleted file mode 100644
index 2066017..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/TestingLogicalSlot.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.instance;
-
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
-import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
-import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.util.Preconditions;
-
-import javax.annotation.Nullable;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * Simple testing logical slot for testing purposes.
- */
-public class TestingLogicalSlot implements LogicalSlot {
-
- private final TaskManagerLocation taskManagerLocation;
-
- private final TaskManagerGateway taskManagerGateway;
-
- private final AtomicReference<Payload> payloadReference;
-
- private final int slotNumber;
-
- private final CompletableFuture<?> releaseFuture = new CompletableFuture<>();
-
- private final AllocationID allocationId;
-
- private final SlotRequestID slotRequestId;
-
- public TestingLogicalSlot() {
- this(
- new LocalTaskManagerLocation(),
- new SimpleAckingTaskManagerGateway(),
- 0,
- new AllocationID(),
- new SlotRequestID());
- }
-
- public TestingLogicalSlot(
- TaskManagerLocation taskManagerLocation,
- TaskManagerGateway taskManagerGateway,
- int slotNumber,
- AllocationID allocationId,
- SlotRequestID slotRequestId) {
- this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation);
- this.taskManagerGateway = Preconditions.checkNotNull(taskManagerGateway);
- this.payloadReference = new AtomicReference<>();
- this.slotNumber = slotNumber;
- this.allocationId = Preconditions.checkNotNull(allocationId);
- this.slotRequestId = Preconditions.checkNotNull(slotRequestId);
- }
-
- @Override
- public TaskManagerLocation getTaskManagerLocation() {
- return taskManagerLocation;
- }
-
- @Override
- public TaskManagerGateway getTaskManagerGateway() {
- return taskManagerGateway;
- }
-
- @Override
- public boolean isAlive() {
- return !releaseFuture.isDone();
- }
-
- @Override
- public boolean tryAssignPayload(Payload payload) {
- return payloadReference.compareAndSet(null, payload);
- }
-
- @Nullable
- @Override
- public Payload getPayload() {
- return payloadReference.get();
- }
-
- @Override
- public CompletableFuture<?> releaseSlot() {
- releaseFuture.complete(null);
-
- return releaseFuture;
- }
-
- @Override
- public int getPhysicalSlotNumber() {
- return slotNumber;
- }
-
- @Override
- public AllocationID getAllocationId() {
- return allocationId;
- }
-
- @Override
- public SlotRequestID getSlotRequestId() {
- return slotRequestId;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/test/java/org/apache/flink/runtime/instance/TestingPayload.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/TestingPayload.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/TestingPayload.java
deleted file mode 100644
index 3369882..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/TestingPayload.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.instance;
-
-import java.util.concurrent.CompletableFuture;
-
-/**
- * Simple payload implementation for testing purposes.
- */
-public class TestingPayload implements LogicalSlot.Payload {
-
- private final CompletableFuture<?> terminationFuture;
-
- public TestingPayload() {
- this.terminationFuture = new CompletableFuture<>();
- }
-
-
- @Override
- public void fail(Throwable cause) {
- terminationFuture.complete(null);
- }
-
- @Override
- public CompletableFuture<?> getTerminalStateFuture() {
- return terminationFuture;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintTest.java
index d40ff61..77d162f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintTest.java
@@ -143,7 +143,7 @@ public class CoLocationConstraintTest {
assertEquals(instance2.getTaskManagerLocation(), constraint.getLocation());
// release the slot
- slot2_1.releaseInstanceSlot();
+ slot2_1.releaseSlot();
// we should still have a location
assertTrue(constraint.isAssigned());